wireguard: fix re-handshake timer when response sent
[vpp.git] / test / test_wireguard.py
1 #!/usr/bin/env python3
2 """ Wg tests """
3
4 import datetime
5 import base64
6 import os
7
8 from hashlib import blake2s
9 from scapy.packet import Packet
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.wireguard import (
15     Wireguard,
16     WireguardResponse,
17     WireguardInitiation,
18     WireguardTransport,
19     WireguardCookieReply,
20 )
21 from cryptography.hazmat.primitives.asymmetric.x25519 import (
22     X25519PrivateKey,
23     X25519PublicKey,
24 )
25 from cryptography.hazmat.primitives.serialization import (
26     Encoding,
27     PrivateFormat,
28     PublicFormat,
29     NoEncryption,
30 )
31 from cryptography.hazmat.primitives.hashes import BLAKE2s, Hash
32 from cryptography.hazmat.primitives.hmac import HMAC
33 from cryptography.hazmat.backends import default_backend
34 from noise.connection import NoiseConnection, Keypair
35
36 from Crypto.Cipher import ChaCha20_Poly1305
37 from Crypto.Random import get_random_bytes
38
39 from vpp_ipip_tun_interface import VppIpIpTunInterface
40 from vpp_interface import VppInterface
41 from vpp_pg_interface import is_ipv6_misc
42 from vpp_ip_route import VppIpRoute, VppRoutePath
43 from vpp_object import VppObject
44 from vpp_papi import VppEnum
45 from framework import tag_fixme_ubuntu2204, tag_fixme_debian11
46 from framework import is_distro_ubuntu2204, is_distro_debian11
47 from framework import VppTestCase
48 from re import compile
49 import unittest
50
51 """ TestWg is a subclass of  VPPTestCase classes.
52
53 Wg test.
54
55 """
56
57
58 def private_key_bytes(k):
59     return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
60
61
62 def public_key_bytes(k):
63     return k.public_bytes(Encoding.Raw, PublicFormat.Raw)
64
65
66 def get_field_bytes(pkt, name):
67     fld, val = pkt.getfield_and_val(name)
68     return fld.i2m(pkt, val)
69
70
71 class VppWgInterface(VppInterface):
72     """
73     VPP WireGuard interface
74     """
75
76     def __init__(self, test, src, port):
77         super(VppWgInterface, self).__init__(test)
78
79         self.port = port
80         self.src = src
81         self.private_key = X25519PrivateKey.generate()
82         self.public_key = self.private_key.public_key()
83
84         # cookie related params
85         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
86
87     def public_key_bytes(self):
88         return public_key_bytes(self.public_key)
89
90     def private_key_bytes(self):
91         return private_key_bytes(self.private_key)
92
93     def add_vpp_config(self):
94         r = self.test.vapi.wireguard_interface_create(
95             interface={
96                 "user_instance": 0xFFFFFFFF,
97                 "port": self.port,
98                 "src_ip": self.src,
99                 "private_key": private_key_bytes(self.private_key),
100                 "generate_key": False,
101             }
102         )
103         self.set_sw_if_index(r.sw_if_index)
104         self.test.registry.register(self, self.test.logger)
105         return self
106
107     def remove_vpp_config(self):
108         self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index)
109
110     def query_vpp_config(self):
111         ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF)
112         for t in ts:
113             if (
114                 t.interface.sw_if_index == self._sw_if_index
115                 and str(t.interface.src_ip) == self.src
116                 and t.interface.port == self.port
117                 and t.interface.private_key == private_key_bytes(self.private_key)
118             ):
119                 return True
120         return False
121
122     def want_events(self, peer_index=0xFFFFFFFF):
123         self.test.vapi.want_wireguard_peer_events(
124             enable_disable=1,
125             pid=os.getpid(),
126             sw_if_index=self._sw_if_index,
127             peer_index=peer_index,
128         )
129
130     def wait_events(self, expect, peers, timeout=5):
131         for i in range(len(peers)):
132             rv = self.test.vapi.wait_for_event(timeout, "wireguard_peer_event")
133             self.test.assertEqual(rv.peer_index, peers[i])
134             self.test.assertEqual(rv.flags, expect)
135
136     def __str__(self):
137         return self.object_id()
138
139     def object_id(self):
140         return "wireguard-%d" % self._sw_if_index
141
142
143 NOISE_HANDSHAKE_NAME = b"Noise_IKpsk2_25519_ChaChaPoly_BLAKE2s"
144 NOISE_IDENTIFIER_NAME = b"WireGuard v1 zx2c4 Jason@zx2c4.com"
145
146 HANDSHAKE_COUNTING_INTERVAL = 0.5
147 UNDER_LOAD_INTERVAL = 1.0
148 HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD = 40
149 HANDSHAKE_NUM_BEFORE_RATELIMITING = 5
150
151
152 class VppWgPeer(VppObject):
153     def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15):
154         self._test = test
155         self.itf = itf
156         self.endpoint = endpoint
157         self.port = port
158         self.allowed_ips = allowed_ips
159         self.persistent_keepalive = persistent_keepalive
160
161         # remote peer's public
162         self.private_key = X25519PrivateKey.generate()
163         self.public_key = self.private_key.public_key()
164
165         # cookie related params
166         self.cookie_key = blake2s(b"cookie--" + self.public_key_bytes()).digest()
167         self.last_sent_cookie = None
168         self.last_mac1 = None
169         self.last_received_cookie = None
170
171         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
172
173     def change_endpoint(self, endpoint, port):
174         self.endpoint = endpoint
175         self.port = port
176
177     def add_vpp_config(self):
178         rv = self._test.vapi.wireguard_peer_add(
179             peer={
180                 "public_key": self.public_key_bytes(),
181                 "port": self.port,
182                 "endpoint": self.endpoint,
183                 "n_allowed_ips": len(self.allowed_ips),
184                 "allowed_ips": self.allowed_ips,
185                 "sw_if_index": self.itf.sw_if_index,
186                 "persistent_keepalive": self.persistent_keepalive,
187             }
188         )
189         self.index = rv.peer_index
190         self.receiver_index = self.index + 1
191         self._test.registry.register(self, self._test.logger)
192         return self
193
194     def remove_vpp_config(self):
195         self._test.vapi.wireguard_peer_remove(peer_index=self.index)
196
197     def object_id(self):
198         return "wireguard-peer-%s" % self.index
199
200     def public_key_bytes(self):
201         return public_key_bytes(self.public_key)
202
203     def query_vpp_config(self):
204         peers = self._test.vapi.wireguard_peers_dump()
205
206         for p in peers:
207             # "::" endpoint will be returned as "0.0.0.0" in peer's details
208             endpoint = "0.0.0.0" if self.endpoint == "::" else self.endpoint
209             if (
210                 p.peer.public_key == self.public_key_bytes()
211                 and p.peer.port == self.port
212                 and str(p.peer.endpoint) == endpoint
213                 and p.peer.sw_if_index == self.itf.sw_if_index
214                 and len(self.allowed_ips) == p.peer.n_allowed_ips
215             ):
216                 self.allowed_ips.sort()
217                 p.peer.allowed_ips.sort()
218
219                 for (a1, a2) in zip(self.allowed_ips, p.peer.allowed_ips):
220                     if str(a1) != str(a2):
221                         return False
222                 return True
223         return False
224
225     def mk_tunnel_header(self, tx_itf, is_ip6=False):
226         if is_ip6 is False:
227             return (
228                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
229                 / IP(src=self.endpoint, dst=self.itf.src)
230                 / UDP(sport=self.port, dport=self.itf.port)
231             )
232         else:
233             return (
234                 Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
235                 / IPv6(src=self.endpoint, dst=self.itf.src)
236                 / UDP(sport=self.port, dport=self.itf.port)
237             )
238
239     def noise_reset(self):
240         self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
241
242     def noise_init(self, public_key=None):
243         self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
244         self.noise.set_psks(psk=bytes(bytearray(32)))
245
246         if not public_key:
247             public_key = self.itf.public_key
248
249         # local/this private
250         self.noise.set_keypair_from_private_bytes(
251             Keypair.STATIC, private_key_bytes(self.private_key)
252         )
253         # remote's public
254         self.noise.set_keypair_from_public_bytes(
255             Keypair.REMOTE_STATIC, public_key_bytes(public_key)
256         )
257
258         self.noise.start_handshake()
259
260     def mk_cookie(self, p, tx_itf, is_resp=False, is_ip6=False):
261         self.verify_header(p, is_ip6)
262
263         wg_pkt = Wireguard(p[Raw])
264
265         if is_resp:
266             self._test.assertEqual(wg_pkt[Wireguard].message_type, 2)
267             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
268             self._test.assertEqual(wg_pkt[WireguardResponse].mac2, bytes([0] * 16))
269         else:
270             self._test.assertEqual(wg_pkt[Wireguard].message_type, 1)
271             self._test.assertEqual(wg_pkt[Wireguard].reserved_zero, 0)
272             self._test.assertEqual(wg_pkt[WireguardInitiation].mac2, bytes([0] * 16))
273
274         # collect info from wg packet (initiation or response)
275         src = get_field_bytes(p[IPv6 if is_ip6 else IP], "src")
276         sport = p[UDP].sport.to_bytes(2, byteorder="big")
277         if is_resp:
278             mac1 = wg_pkt[WireguardResponse].mac1
279             sender_index = wg_pkt[WireguardResponse].sender_index
280         else:
281             mac1 = wg_pkt[WireguardInitiation].mac1
282             sender_index = wg_pkt[WireguardInitiation].sender_index
283
284         # make cookie reply
285         cookie_reply = Wireguard() / WireguardCookieReply()
286         cookie_reply[Wireguard].message_type = 3
287         cookie_reply[Wireguard].reserved_zero = 0
288         cookie_reply[WireguardCookieReply].receiver_index = sender_index
289         nonce = get_random_bytes(24)
290         cookie_reply[WireguardCookieReply].nonce = nonce
291
292         # generate cookie data
293         changing_secret = get_random_bytes(32)
294         self.last_sent_cookie = blake2s(
295             src + sport, digest_size=16, key=changing_secret
296         ).digest()
297
298         # encrypt cookie data
299         cipher = ChaCha20_Poly1305.new(key=self.cookie_key, nonce=nonce)
300         cipher.update(mac1)
301         ciphertext, tag = cipher.encrypt_and_digest(self.last_sent_cookie)
302         cookie_reply[WireguardCookieReply].encrypted_cookie = ciphertext + tag
303
304         # prepare cookie reply to be sent
305         cookie_reply = self.mk_tunnel_header(tx_itf, is_ip6) / cookie_reply
306
307         return cookie_reply
308
309     def consume_cookie(self, p, is_ip6=False):
310         self.verify_header(p, is_ip6)
311
312         cookie_reply = Wireguard(p[Raw])
313
314         self._test.assertEqual(cookie_reply[Wireguard].message_type, 3)
315         self._test.assertEqual(cookie_reply[Wireguard].reserved_zero, 0)
316         self._test.assertEqual(
317             cookie_reply[WireguardCookieReply].receiver_index, self.receiver_index
318         )
319
320         # collect info from cookie reply
321         nonce = cookie_reply[WireguardCookieReply].nonce
322         encrypted_cookie = cookie_reply[WireguardCookieReply].encrypted_cookie
323         ciphertext, tag = encrypted_cookie[:16], encrypted_cookie[16:]
324
325         # decrypt cookie data
326         cipher = ChaCha20_Poly1305.new(key=self.itf.cookie_key, nonce=nonce)
327         cipher.update(self.last_mac1)
328         self.last_received_cookie = cipher.decrypt_and_verify(ciphertext, tag)
329
330     def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
331         self.noise.set_as_initiator()
332         self.noise_init(public_key)
333
334         p = Wireguard() / WireguardInitiation()
335
336         p[Wireguard].message_type = 1
337         p[Wireguard].reserved_zero = 0
338         p[WireguardInitiation].sender_index = self.receiver_index
339
340         # some random data for the message
341         #  lifted from the noise protocol's wireguard example
342         now = datetime.datetime.now()
343         tai = struct.pack(
344             "!qi",
345             4611686018427387914 + int(now.timestamp()),
346             int(now.microsecond * 1e3),
347         )
348         b = self.noise.write_message(payload=tai)
349
350         # load noise into init message
351         p[WireguardInitiation].unencrypted_ephemeral = b[0:32]
352         p[WireguardInitiation].encrypted_static = b[32:80]
353         p[WireguardInitiation].encrypted_timestamp = b[80:108]
354
355         # generate the mac1 hash
356         mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest()
357         mac1 = blake2s(bytes(p)[0:116], digest_size=16, key=mac_key).digest()
358         p[WireguardInitiation].mac1 = mac1
359         self.last_mac1 = mac1
360
361         # generate the mac2 hash
362         if self.last_received_cookie:
363             mac2 = blake2s(
364                 bytes(p)[0:132], digest_size=16, key=self.last_received_cookie
365             ).digest()
366             p[WireguardInitiation].mac2 = mac2
367             self.last_received_cookie = None
368         else:
369             p[WireguardInitiation].mac2 = bytearray(16)
370
371         p = self.mk_tunnel_header(tx_itf, is_ip6) / p
372
373         return p
374
375     def verify_header(self, p, is_ip6=False):
376         if is_ip6 is False:
377             self._test.assertEqual(p[IP].src, self.itf.src)
378             self._test.assertEqual(p[IP].dst, self.endpoint)
379         else:
380             self._test.assertEqual(p[IPv6].src, self.itf.src)
381             self._test.assertEqual(p[IPv6].dst, self.endpoint)
382         self._test.assertEqual(p[UDP].sport, self.itf.port)
383         self._test.assertEqual(p[UDP].dport, self.port)
384         self._test.assert_packet_checksums_valid(p)
385
386     def consume_init(self, p, tx_itf, is_ip6=False, is_mac2=False):
387         self.noise.set_as_responder()
388         self.noise_init(self.itf.public_key)
389         self.verify_header(p, is_ip6)
390
391         init = Wireguard(p[Raw])
392
393         self._test.assertEqual(init[Wireguard].message_type, 1)
394         self._test.assertEqual(init[Wireguard].reserved_zero, 0)
395
396         self.sender = init[WireguardInitiation].sender_index
397
398         # validate the mac1 hash
399         mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest()
400         mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest()
401         self._test.assertEqual(init[WireguardInitiation].mac1, mac1)
402
403         # validate the mac2 hash
404         if is_mac2:
405             self._test.assertNotEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
406             self._test.assertNotEqual(self.last_sent_cookie, None)
407             mac2 = blake2s(
408                 bytes(init)[0:-16], digest_size=16, key=self.last_sent_cookie
409             ).digest()
410             self._test.assertEqual(init[WireguardInitiation].mac2, mac2)
411             self.last_sent_cookie = None
412         else:
413             self._test.assertEqual(init[WireguardInitiation].mac2, bytes([0] * 16))
414
415         # this passes only unencrypted_ephemeral, encrypted_static,
416         # encrypted_timestamp fields of the init
417         payload = self.noise.read_message(bytes(init)[8:-32])
418
419         # build the response
420         b = self.noise.write_message()
421         mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest()
422         resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse(
423             sender_index=self.receiver_index,
424             receiver_index=self.sender,
425             unencrypted_ephemeral=b[0:32],
426             encrypted_nothing=b[32:],
427         )
428         mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest()
429         resp[WireguardResponse].mac1 = mac1
430         self.last_mac1 = mac1
431
432         resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp
433         self._test.assertTrue(self.noise.handshake_finished)
434
435         return resp
436
437     def consume_response(self, p, is_ip6=False):
438         self.verify_header(p, is_ip6)
439
440         resp = Wireguard(p[Raw])
441
442         self._test.assertEqual(resp[Wireguard].message_type, 2)
443         self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
444         self._test.assertEqual(
445             resp[WireguardResponse].receiver_index, self.receiver_index
446         )
447
448         self.sender = resp[Wireguard].sender_index
449
450         payload = self.noise.read_message(bytes(resp)[12:60])
451         self._test.assertEqual(payload, b"")
452         self._test.assertTrue(self.noise.handshake_finished)
453
454     def decrypt_transport(self, p, is_ip6=False):
455         self.verify_header(p, is_ip6)
456
457         p = Wireguard(p[Raw])
458         self._test.assertEqual(p[Wireguard].message_type, 4)
459         self._test.assertEqual(p[Wireguard].reserved_zero, 0)
460         self._test.assertEqual(
461             p[WireguardTransport].receiver_index, self.receiver_index
462         )
463
464         d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet)
465         return d
466
467     def encrypt_transport(self, p):
468         return self.noise.encrypt(bytes(p))
469
470     def validate_encapped(self, rxs, tx, is_ip6=False):
471         for rx in rxs:
472             if is_ip6 is False:
473                 rx = IP(self.decrypt_transport(rx, is_ip6=is_ip6))
474
475                 # check the original packet is present
476                 self._test.assertEqual(rx[IP].dst, tx[IP].dst)
477                 self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1)
478             else:
479                 rx = IPv6(self.decrypt_transport(rx, is_ip6=is_ip6))
480
481                 # check the original packet is present
482                 self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
483                 self._test.assertEqual(rx[IPv6].hlim, tx[IPv6].hlim - 1)
484
485     def want_events(self):
486         self._test.vapi.want_wireguard_peer_events(
487             enable_disable=1,
488             pid=os.getpid(),
489             peer_index=self.index,
490             sw_if_index=self.itf.sw_if_index,
491         )
492
493     def wait_event(self, expect, timeout=5):
494         rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
495         self._test.assertEqual(rv.flags, expect)
496         self._test.assertEqual(rv.peer_index, self.index)
497
498
499 def is_handshake_init(p):
500     wg_p = Wireguard(p[Raw])
501
502     return wg_p[Wireguard].message_type == 1
503
504
505 @tag_fixme_ubuntu2204
506 @tag_fixme_debian11
507 class TestWg(VppTestCase):
508     """Wireguard Test Case"""
509
510     error_str = compile(r"Error")
511
512     wg4_output_node_name = "/err/wg4-output-tun/"
513     wg4_input_node_name = "/err/wg4-input/"
514     wg6_output_node_name = "/err/wg6-output-tun/"
515     wg6_input_node_name = "/err/wg6-input/"
516     kp4_error = wg4_output_node_name + "Keypair error"
517     mac4_error = wg4_input_node_name + "Invalid MAC handshake"
518     peer4_in_err = wg4_input_node_name + "Peer error"
519     peer4_out_err = wg4_output_node_name + "Peer error"
520     kp6_error = wg6_output_node_name + "Keypair error"
521     mac6_error = wg6_input_node_name + "Invalid MAC handshake"
522     peer6_in_err = wg6_input_node_name + "Peer error"
523     peer6_out_err = wg6_output_node_name + "Peer error"
524     cookie_dec4_err = wg4_input_node_name + "Failed during Cookie decryption"
525     cookie_dec6_err = wg6_input_node_name + "Failed during Cookie decryption"
526     ratelimited4_err = wg4_input_node_name + "Handshake ratelimited"
527     ratelimited6_err = wg6_input_node_name + "Handshake ratelimited"
528
529     @classmethod
530     def setUpClass(cls):
531         super(TestWg, cls).setUpClass()
532         if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
533             cls, "vpp"
534         ):
535             return
536         try:
537             cls.create_pg_interfaces(range(3))
538             for i in cls.pg_interfaces:
539                 i.admin_up()
540                 i.config_ip4()
541                 i.config_ip6()
542                 i.resolve_arp()
543                 i.resolve_ndp()
544
545         except Exception:
546             super(TestWg, cls).tearDownClass()
547             raise
548
549     @classmethod
550     def tearDownClass(cls):
551         super(TestWg, cls).tearDownClass()
552
553     def setUp(self):
554         super(VppTestCase, self).setUp()
555         self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
556         self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
557         self.base_peer4_in_err = self.statistics.get_err_counter(self.peer4_in_err)
558         self.base_peer4_out_err = self.statistics.get_err_counter(self.peer4_out_err)
559         self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
560         self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
561         self.base_peer6_in_err = self.statistics.get_err_counter(self.peer6_in_err)
562         self.base_peer6_out_err = self.statistics.get_err_counter(self.peer6_out_err)
563         self.base_cookie_dec4_err = self.statistics.get_err_counter(
564             self.cookie_dec4_err
565         )
566         self.base_cookie_dec6_err = self.statistics.get_err_counter(
567             self.cookie_dec6_err
568         )
569         self.base_ratelimited4_err = self.statistics.get_err_counter(
570             self.ratelimited4_err
571         )
572         self.base_ratelimited6_err = self.statistics.get_err_counter(
573             self.ratelimited6_err
574         )
575
576     def send_and_assert_no_replies_ignoring_init(
577         self, intf, pkts, remark="", timeout=None
578     ):
579         self.pg_send(intf, pkts)
580
581         def _filter_out_fn(p):
582             return is_ipv6_misc(p) or is_handshake_init(p)
583
584         try:
585             if not timeout:
586                 timeout = 1
587             for i in self.pg_interfaces:
588                 i.assert_nothing_captured(
589                     timeout=timeout, remark=remark, filter_out_fn=_filter_out_fn
590                 )
591                 timeout = 0.1
592         finally:
593             pass
594
595     def test_wg_interface(self):
596         """Simple interface creation"""
597         port = 12312
598
599         # Create interface
600         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
601
602         self.logger.info(self.vapi.cli("sh int"))
603
604         # delete interface
605         wg0.remove_vpp_config()
606
607     def test_handshake_hash(self):
608         """test hashing an init message"""
609         # a init packet generated by linux given the key below
610         h = (
611             "0100000098b9032b"
612             "55cc4b39e73c3d24"
613             "a2a1ab884b524a81"
614             "1808bb86640fb70d"
615             "e93154fec1879125"
616             "ab012624a27f0b75"
617             "c0a2582f438ddb5f"
618             "8e768af40b4ab444"
619             "02f9ff473e1b797e"
620             "80d39d93c5480c82"
621             "a3d4510f70396976"
622             "586fb67300a5167b"
623             "ae6ca3ff3dfd00eb"
624             "59be198810f5aa03"
625             "6abc243d2155ee4f"
626             "2336483900aef801"
627             "08752cd700000000"
628             "0000000000000000"
629             "00000000"
630         )
631
632         b = bytearray.fromhex(h)
633         tgt = Wireguard(b)
634
635         pubb = base64.b64decode("aRuHFTTxICIQNefp05oKWlJv3zgKxb8+WW7JJMh0jyM=")
636         pub = X25519PublicKey.from_public_bytes(pubb)
637
638         self.assertEqual(pubb, public_key_bytes(pub))
639
640         # strip the macs and build a new packet
641         init = b[0:-32]
642         mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest()
643         init += blake2s(init, digest_size=16, key=mac_key).digest()
644         init += b"\x00" * 16
645
646         act = Wireguard(init)
647
648         self.assertEqual(tgt, act)
649
650     def _test_wg_send_cookie_tmpl(self, is_resp, is_ip6):
651         port = 12323
652
653         # create wg interface
654         if is_ip6:
655             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
656             wg0.admin_up()
657             wg0.config_ip6()
658         else:
659             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
660             wg0.admin_up()
661             wg0.config_ip4()
662
663         self.pg_enable_capture(self.pg_interfaces)
664         self.pg_start()
665
666         # create a peer
667         if is_ip6:
668             peer_1 = VppWgPeer(
669                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
670             ).add_vpp_config()
671         else:
672             peer_1 = VppWgPeer(
673                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
674             ).add_vpp_config()
675         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
676
677         if is_resp:
678             # prepare and send a handshake initiation
679             # expect the peer to send a handshake response
680             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
681             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
682         else:
683             # wait for the peer to send a handshake initiation
684             rxs = self.pg1.get_capture(1, timeout=2)
685
686         # prepare and send a wrong cookie reply
687         # expect no replies and the cookie error incremented
688         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
689         cookie.nonce = b"1234567890"
690         self.send_and_assert_no_replies(self.pg1, [cookie], timeout=0.1)
691         if is_ip6:
692             self.assertEqual(
693                 self.base_cookie_dec6_err + 1,
694                 self.statistics.get_err_counter(self.cookie_dec6_err),
695             )
696         else:
697             self.assertEqual(
698                 self.base_cookie_dec4_err + 1,
699                 self.statistics.get_err_counter(self.cookie_dec4_err),
700             )
701
702         # prepare and send a correct cookie reply
703         cookie = peer_1.mk_cookie(rxs[0], self.pg1, is_resp=is_resp, is_ip6=is_ip6)
704         self.pg_send(self.pg1, [cookie])
705
706         # wait for the peer to send a handshake initiation with mac2 set
707         rxs = self.pg1.get_capture(1, timeout=6)
708
709         # verify the initiation and its mac2
710         peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6, is_mac2=True)
711
712         # remove configs
713         peer_1.remove_vpp_config()
714         wg0.remove_vpp_config()
715
716     def test_wg_send_cookie_on_init_v4(self):
717         """Send cookie on handshake initiation (v4)"""
718         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=False)
719
720     def test_wg_send_cookie_on_init_v6(self):
721         """Send cookie on handshake initiation (v6)"""
722         self._test_wg_send_cookie_tmpl(is_resp=False, is_ip6=True)
723
724     def test_wg_send_cookie_on_resp_v4(self):
725         """Send cookie on handshake response (v4)"""
726         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=False)
727
728     def test_wg_send_cookie_on_resp_v6(self):
729         """Send cookie on handshake response (v6)"""
730         self._test_wg_send_cookie_tmpl(is_resp=True, is_ip6=True)
731
732     def _test_wg_receive_cookie_tmpl(self, is_resp, is_ip6):
733         port = 12323
734
735         # create wg interface
736         if is_ip6:
737             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
738             wg0.admin_up()
739             wg0.config_ip6()
740         else:
741             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
742             wg0.admin_up()
743             wg0.config_ip4()
744
745         self.pg_enable_capture(self.pg_interfaces)
746         self.pg_start()
747
748         # create a peer
749         if is_ip6:
750             peer_1 = VppWgPeer(
751                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
752             ).add_vpp_config()
753         else:
754             peer_1 = VppWgPeer(
755                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
756             ).add_vpp_config()
757         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
758
759         if is_resp:
760             # wait for the peer to send a handshake initiation
761             rxs = self.pg1.get_capture(1, timeout=2)
762             # prepare and send a bunch of handshake responses
763             # expect to switch to under load state
764             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
765             txs = [resp] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
766             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
767             # reset noise to be able to turn into initiator later
768             peer_1.noise_reset()
769         else:
770             # prepare and send a bunch of handshake initiations
771             # expect to switch to under load state
772             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
773             txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
774             rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
775
776         # expect the peer to send a cookie reply
777         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
778
779         # prepare and send a handshake initiation with wrong mac2
780         # expect a cookie reply
781         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
782         init.mac2 = b"1234567890"
783         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
784         peer_1.consume_cookie(rxs[0], is_ip6=is_ip6)
785
786         # prepare and send a handshake initiation with correct mac2
787         # expect a handshake response
788         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
789         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
790
791         # verify the response
792         peer_1.consume_response(rxs[0], is_ip6=is_ip6)
793
794         # clear up under load state
795         self.sleep(UNDER_LOAD_INTERVAL)
796
797         # remove configs
798         peer_1.remove_vpp_config()
799         wg0.remove_vpp_config()
800
801     def test_wg_receive_cookie_on_init_v4(self):
802         """Receive cookie on handshake initiation (v4)"""
803         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=False)
804
805     def test_wg_receive_cookie_on_init_v6(self):
806         """Receive cookie on handshake initiation (v6)"""
807         self._test_wg_receive_cookie_tmpl(is_resp=False, is_ip6=True)
808
809     def test_wg_receive_cookie_on_resp_v4(self):
810         """Receive cookie on handshake response (v4)"""
811         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=False)
812
813     def test_wg_receive_cookie_on_resp_v6(self):
814         """Receive cookie on handshake response (v6)"""
815         self._test_wg_receive_cookie_tmpl(is_resp=True, is_ip6=True)
816
817     def test_wg_under_load_interval(self):
818         """Under load interval"""
819         port = 12323
820
821         # create wg interface
822         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
823         wg0.admin_up()
824         wg0.config_ip4()
825
826         self.pg_enable_capture(self.pg_interfaces)
827         self.pg_start()
828
829         # create a peer
830         peer_1 = VppWgPeer(
831             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
832         ).add_vpp_config()
833         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
834
835         # prepare and send a bunch of handshake initiations
836         # expect to switch to under load state
837         init = peer_1.mk_handshake(self.pg1)
838         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
839         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
840
841         # expect the peer to send a cookie reply
842         peer_1.consume_cookie(rxs[-1])
843
844         # sleep till the next counting interval
845         # expect under load state is still active
846         self.sleep(HANDSHAKE_COUNTING_INTERVAL)
847
848         # prepare and send a handshake initiation with wrong mac2
849         # expect a cookie reply
850         init = peer_1.mk_handshake(self.pg1)
851         init.mac2 = b"1234567890"
852         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
853         peer_1.consume_cookie(rxs[0])
854
855         # sleep till the end of being under load
856         # expect under load state is over
857         self.sleep(UNDER_LOAD_INTERVAL - HANDSHAKE_COUNTING_INTERVAL)
858
859         # prepare and send a handshake initiation with wrong mac2
860         # expect a handshake response
861         init = peer_1.mk_handshake(self.pg1)
862         init.mac2 = b"1234567890"
863         rxs = self.send_and_expect(self.pg1, [init], self.pg1)
864
865         # verify the response
866         peer_1.consume_response(rxs[0])
867
868         # remove configs
869         peer_1.remove_vpp_config()
870         wg0.remove_vpp_config()
871
872     def _test_wg_handshake_ratelimiting_tmpl(self, is_ip6):
873         port = 12323
874
875         # create wg interface
876         if is_ip6:
877             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
878             wg0.admin_up()
879             wg0.config_ip6()
880         else:
881             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
882             wg0.admin_up()
883             wg0.config_ip4()
884
885         self.pg_enable_capture(self.pg_interfaces)
886         self.pg_start()
887
888         # create a peer
889         if is_ip6:
890             peer_1 = VppWgPeer(
891                 self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
892             ).add_vpp_config()
893         else:
894             peer_1 = VppWgPeer(
895                 self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
896             ).add_vpp_config()
897         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
898
899         # prepare and send a bunch of handshake initiations
900         # expect to switch to under load state
901         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
902         txs = [init] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
903         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
904
905         # expect the peer to send a cookie reply
906         peer_1.consume_cookie(rxs[-1], is_ip6=is_ip6)
907
908         # prepare and send a bunch of handshake initiations with correct mac2
909         # expect a handshake response and then ratelimiting
910         NUM_TO_REJECT = 10
911         init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
912         txs = [init] * (HANDSHAKE_NUM_BEFORE_RATELIMITING + NUM_TO_REJECT)
913         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
914
915         if is_ip6:
916             self.assertEqual(
917                 self.base_ratelimited6_err + NUM_TO_REJECT,
918                 self.statistics.get_err_counter(self.ratelimited6_err),
919             )
920         else:
921             self.assertEqual(
922                 self.base_ratelimited4_err + NUM_TO_REJECT,
923                 self.statistics.get_err_counter(self.ratelimited4_err),
924             )
925
926         # verify the response
927         peer_1.consume_response(rxs[0], is_ip6=is_ip6)
928
929         # clear up under load state
930         self.sleep(UNDER_LOAD_INTERVAL)
931
932         # remove configs
933         peer_1.remove_vpp_config()
934         wg0.remove_vpp_config()
935
936     def test_wg_handshake_ratelimiting_v4(self):
937         """Handshake ratelimiting (v4)"""
938         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=False)
939
940     def test_wg_handshake_ratelimiting_v6(self):
941         """Handshake ratelimiting (v6)"""
942         self._test_wg_handshake_ratelimiting_tmpl(is_ip6=True)
943
944     def test_wg_handshake_ratelimiting_multi_peer(self):
945         """Handshake ratelimiting (multiple peer)"""
946         port = 12323
947
948         # create wg interface
949         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
950         wg0.admin_up()
951         wg0.config_ip4()
952
953         self.pg_enable_capture(self.pg_interfaces)
954         self.pg_start()
955
956         # create two peers
957         NUM_PEERS = 2
958         self.pg1.generate_remote_hosts(NUM_PEERS)
959         self.pg1.configure_ipv4_neighbors()
960
961         peer_1 = VppWgPeer(
962             self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
963         ).add_vpp_config()
964         peer_2 = VppWgPeer(
965             self, wg0, self.pg1.remote_hosts[1].ip4, port + 1, ["10.11.4.0/24"]
966         ).add_vpp_config()
967         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 2)
968
969         # (peer_1) prepare and send a bunch of handshake initiations
970         # expect not to switch to under load state
971         init_1 = peer_1.mk_handshake(self.pg1)
972         txs = [init_1] * HANDSHAKE_NUM_PER_PEER_UNTIL_UNDER_LOAD
973         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
974
975         # (peer_1) expect the peer to send a handshake response
976         peer_1.consume_response(rxs[0])
977         peer_1.noise_reset()
978
979         # (peer_1) send another bunch of handshake initiations
980         # expect to switch to under load state
981         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
982
983         # (peer_1) expect the peer to send a cookie reply
984         peer_1.consume_cookie(rxs[-1])
985
986         # (peer_2) prepare and send a handshake initiation
987         # expect a cookie reply
988         init_2 = peer_2.mk_handshake(self.pg1)
989         rxs = self.send_and_expect(self.pg1, [init_2], self.pg1)
990         peer_2.consume_cookie(rxs[0])
991
992         # (peer_1) prepare and send a bunch of handshake initiations with correct mac2
993         # expect no ratelimiting and a handshake response
994         init_1 = peer_1.mk_handshake(self.pg1)
995         txs = [init_1] * HANDSHAKE_NUM_BEFORE_RATELIMITING
996         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
997         self.assertEqual(
998             self.base_ratelimited4_err,
999             self.statistics.get_err_counter(self.ratelimited4_err),
1000         )
1001
1002         # (peer_1) verify the response
1003         peer_1.consume_response(rxs[0])
1004         peer_1.noise_reset()
1005
1006         # (peer_1) send another two handshake initiations with correct mac2
1007         # expect ratelimiting
1008         # (peer_2) prepare and send a handshake initiation with correct mac2
1009         # expect no ratelimiting and a handshake response
1010         init_2 = peer_2.mk_handshake(self.pg1)
1011         txs = [init_1, init_2, init_1]
1012         rxs = self.send_and_expect_some(self.pg1, txs, self.pg1)
1013
1014         # (peer_1) verify ratelimiting
1015         self.assertEqual(
1016             self.base_ratelimited4_err + 2,
1017             self.statistics.get_err_counter(self.ratelimited4_err),
1018         )
1019
1020         # (peer_2) verify the response
1021         peer_2.consume_response(rxs[0])
1022
1023         # clear up under load state
1024         self.sleep(UNDER_LOAD_INTERVAL)
1025
1026         # remove configs
1027         peer_1.remove_vpp_config()
1028         peer_2.remove_vpp_config()
1029         wg0.remove_vpp_config()
1030
1031     def _test_wg_peer_roaming_on_handshake_tmpl(self, is_endpoint_set, is_resp, is_ip6):
1032         port = 12323
1033
1034         # create wg interface
1035         if is_ip6:
1036             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1037             wg0.admin_up()
1038             wg0.config_ip6()
1039         else:
1040             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1041             wg0.admin_up()
1042             wg0.config_ip4()
1043
1044         self.pg_enable_capture(self.pg_interfaces)
1045         self.pg_start()
1046
1047         # create more remote hosts
1048         NUM_REMOTE_HOSTS = 2
1049         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1050         if is_ip6:
1051             self.pg1.configure_ipv6_neighbors()
1052         else:
1053             self.pg1.configure_ipv4_neighbors()
1054
1055         # create a peer
1056         if is_ip6:
1057             peer_1 = VppWgPeer(
1058                 test=self,
1059                 itf=wg0,
1060                 endpoint=self.pg1.remote_hosts[0].ip6 if is_endpoint_set else "::",
1061                 port=port + 1 if is_endpoint_set else 0,
1062                 allowed_ips=["1::3:0/112"],
1063             ).add_vpp_config()
1064         else:
1065             peer_1 = VppWgPeer(
1066                 test=self,
1067                 itf=wg0,
1068                 endpoint=self.pg1.remote_hosts[0].ip4 if is_endpoint_set else "0.0.0.0",
1069                 port=port + 1 if is_endpoint_set else 0,
1070                 allowed_ips=["10.11.3.0/24"],
1071             ).add_vpp_config()
1072         self.assertTrue(peer_1.query_vpp_config())
1073
1074         if is_resp:
1075             # wait for the peer to send a handshake initiation
1076             rxs = self.pg1.get_capture(1, timeout=2)
1077             # prepare a handshake response
1078             resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1079             # change endpoint
1080             if is_ip6:
1081                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1082                 resp[IPv6].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1083             else:
1084                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1085                 resp[IP].src, resp[UDP].sport = peer_1.endpoint, peer_1.port
1086             # send the handshake response
1087             # expect a keepalive message sent to the new endpoint
1088             rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1089             # verify the keepalive message
1090             b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1091             self.assertEqual(0, len(b))
1092         else:
1093             # change endpoint
1094             if is_ip6:
1095                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1096             else:
1097                 peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1098             # prepare and send a handshake initiation
1099             # expect a handshake response sent to the new endpoint
1100             init = peer_1.mk_handshake(self.pg1, is_ip6=is_ip6)
1101             rxs = self.send_and_expect(self.pg1, [init], self.pg1)
1102             # verify the response
1103             peer_1.consume_response(rxs[0], is_ip6=is_ip6)
1104         self.assertTrue(peer_1.query_vpp_config())
1105
1106         # remove configs
1107         peer_1.remove_vpp_config()
1108         wg0.remove_vpp_config()
1109
1110     def test_wg_peer_roaming_on_init_v4(self):
1111         """Peer roaming on handshake initiation (v4)"""
1112         self._test_wg_peer_roaming_on_handshake_tmpl(
1113             is_endpoint_set=False, is_resp=False, is_ip6=False
1114         )
1115
1116     def test_wg_peer_roaming_on_init_v6(self):
1117         """Peer roaming on handshake initiation (v6)"""
1118         self._test_wg_peer_roaming_on_handshake_tmpl(
1119             is_endpoint_set=False, is_resp=False, is_ip6=True
1120         )
1121
1122     def test_wg_peer_roaming_on_resp_v4(self):
1123         """Peer roaming on handshake response (v4)"""
1124         self._test_wg_peer_roaming_on_handshake_tmpl(
1125             is_endpoint_set=True, is_resp=True, is_ip6=False
1126         )
1127
1128     def test_wg_peer_roaming_on_resp_v6(self):
1129         """Peer roaming on handshake response (v6)"""
1130         self._test_wg_peer_roaming_on_handshake_tmpl(
1131             is_endpoint_set=True, is_resp=True, is_ip6=True
1132         )
1133
1134     def _test_wg_peer_roaming_on_data_tmpl(self, is_async, is_ip6):
1135         self.vapi.wg_set_async_mode(is_async)
1136         port = 12323
1137
1138         # create wg interface
1139         if is_ip6:
1140             wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1141             wg0.admin_up()
1142             wg0.config_ip6()
1143         else:
1144             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1145             wg0.admin_up()
1146             wg0.config_ip4()
1147
1148         self.pg_enable_capture(self.pg_interfaces)
1149         self.pg_start()
1150
1151         # create more remote hosts
1152         NUM_REMOTE_HOSTS = 2
1153         self.pg1.generate_remote_hosts(NUM_REMOTE_HOSTS)
1154         if is_ip6:
1155             self.pg1.configure_ipv6_neighbors()
1156         else:
1157             self.pg1.configure_ipv4_neighbors()
1158
1159         # create a peer
1160         if is_ip6:
1161             peer_1 = VppWgPeer(
1162                 self, wg0, self.pg1.remote_hosts[0].ip6, port + 1, ["1::3:0/112"]
1163             ).add_vpp_config()
1164         else:
1165             peer_1 = VppWgPeer(
1166                 self, wg0, self.pg1.remote_hosts[0].ip4, port + 1, ["10.11.3.0/24"]
1167             ).add_vpp_config()
1168         self.assertTrue(peer_1.query_vpp_config())
1169
1170         # create a route to rewrite traffic into the wg interface
1171         if is_ip6:
1172             r1 = VppIpRoute(
1173                 self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1174             ).add_vpp_config()
1175         else:
1176             r1 = VppIpRoute(
1177                 self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1178             ).add_vpp_config()
1179
1180         # wait for the peer to send a handshake initiation
1181         rxs = self.pg1.get_capture(1, timeout=2)
1182
1183         # prepare and send a handshake response
1184         # expect a keepalive message
1185         resp = peer_1.consume_init(rxs[0], self.pg1, is_ip6=is_ip6)
1186         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1187
1188         # verify the keepalive message
1189         b = peer_1.decrypt_transport(rxs[0], is_ip6=is_ip6)
1190         self.assertEqual(0, len(b))
1191
1192         # change endpoint
1193         if is_ip6:
1194             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip6, port + 100)
1195         else:
1196             peer_1.change_endpoint(self.pg1.remote_hosts[1].ip4, port + 100)
1197
1198         # prepare and send a data packet
1199         # expect endpoint change
1200         if is_ip6:
1201             ip_header = IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1202         else:
1203             ip_header = IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1204         data = (
1205             peer_1.mk_tunnel_header(self.pg1, is_ip6=is_ip6)
1206             / Wireguard(message_type=4, reserved_zero=0)
1207             / WireguardTransport(
1208                 receiver_index=peer_1.sender,
1209                 counter=0,
1210                 encrypted_encapsulated_packet=peer_1.encrypt_transport(
1211                     ip_header / UDP(sport=222, dport=223) / Raw()
1212                 ),
1213             )
1214         )
1215         rxs = self.send_and_expect(self.pg1, [data], self.pg0)
1216         if is_ip6:
1217             self.assertEqual(rxs[0][IPv6].dst, self.pg0.remote_ip6)
1218             self.assertEqual(rxs[0][IPv6].hlim, 19)
1219         else:
1220             self.assertEqual(rxs[0][IP].dst, self.pg0.remote_ip4)
1221             self.assertEqual(rxs[0][IP].ttl, 19)
1222         self.assertTrue(peer_1.query_vpp_config())
1223
1224         # prepare and send a packet that will be rewritten into the wg interface
1225         # expect a data packet sent to the new endpoint
1226         if is_ip6:
1227             ip_header = IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1228         else:
1229             ip_header = IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1230         p = (
1231             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1232             / ip_header
1233             / UDP(sport=555, dport=556)
1234             / Raw()
1235         )
1236         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
1237
1238         # verify the data packet
1239         peer_1.validate_encapped(rxs, p, is_ip6=is_ip6)
1240
1241         # remove configs
1242         r1.remove_vpp_config()
1243         peer_1.remove_vpp_config()
1244         wg0.remove_vpp_config()
1245
1246     def test_wg_peer_roaming_on_data_v4_sync(self):
1247         """Peer roaming on data packet (v4, sync)"""
1248         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=False)
1249
1250     def test_wg_peer_roaming_on_data_v6_sync(self):
1251         """Peer roaming on data packet (v6, sync)"""
1252         self._test_wg_peer_roaming_on_data_tmpl(is_async=False, is_ip6=True)
1253
1254     def test_wg_peer_roaming_on_data_v4_async(self):
1255         """Peer roaming on data packet (v4, async)"""
1256         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=False)
1257
1258     def test_wg_peer_roaming_on_data_v6_async(self):
1259         """Peer roaming on data packet (v6, async)"""
1260         self._test_wg_peer_roaming_on_data_tmpl(is_async=True, is_ip6=True)
1261
1262     def test_wg_peer_resp(self):
1263         """Send handshake response"""
1264         port = 12323
1265
1266         # Create interfaces
1267         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1268         wg0.admin_up()
1269         wg0.config_ip4()
1270
1271         self.pg_enable_capture(self.pg_interfaces)
1272         self.pg_start()
1273
1274         peer_1 = VppWgPeer(
1275             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1276         ).add_vpp_config()
1277         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1278
1279         r1 = VppIpRoute(
1280             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1281         ).add_vpp_config()
1282
1283         # wait for the peer to send a handshake
1284         rx = self.pg1.get_capture(1, timeout=2)
1285
1286         # consume the handshake in the noise protocol and
1287         # generate the response
1288         resp = peer_1.consume_init(rx[0], self.pg1)
1289
1290         # send the response, get keepalive
1291         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
1292
1293         for rx in rxs:
1294             b = peer_1.decrypt_transport(rx)
1295             self.assertEqual(0, len(b))
1296
1297         # send a packets that are routed into the tunnel
1298         p = (
1299             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1300             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1301             / UDP(sport=555, dport=556)
1302             / Raw(b"\x00" * 80)
1303         )
1304
1305         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1306
1307         peer_1.validate_encapped(rxs, p)
1308
1309         # send packets into the tunnel, expect to receive them on
1310         # the other side
1311         p = [
1312             (
1313                 peer_1.mk_tunnel_header(self.pg1)
1314                 / Wireguard(message_type=4, reserved_zero=0)
1315                 / WireguardTransport(
1316                     receiver_index=peer_1.sender,
1317                     counter=ii,
1318                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1319                         (
1320                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1321                             / UDP(sport=222, dport=223)
1322                             / Raw()
1323                         )
1324                     ),
1325                 )
1326             )
1327             for ii in range(255)
1328         ]
1329
1330         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1331
1332         for rx in rxs:
1333             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1334             self.assertEqual(rx[IP].ttl, 19)
1335
1336         r1.remove_vpp_config()
1337         peer_1.remove_vpp_config()
1338         wg0.remove_vpp_config()
1339
1340     def test_wg_peer_v4o4(self):
1341         """Test v4o4"""
1342
1343         port = 12333
1344
1345         # Create interfaces
1346         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1347         wg0.admin_up()
1348         wg0.config_ip4()
1349
1350         peer_1 = VppWgPeer(
1351             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
1352         ).add_vpp_config()
1353         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1354
1355         r1 = VppIpRoute(
1356             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1357         ).add_vpp_config()
1358         r2 = VppIpRoute(
1359             self, "20.22.3.0", 24, [VppRoutePath("20.22.3.1", wg0.sw_if_index)]
1360         ).add_vpp_config()
1361
1362         # route a packet into the wg interface
1363         #  use the allowed-ip prefix
1364         #  this is dropped because the peer is not initiated
1365         p = (
1366             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1367             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1368             / UDP(sport=555, dport=556)
1369             / Raw()
1370         )
1371         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1372         self.assertEqual(
1373             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1374         )
1375
1376         # route a packet into the wg interface
1377         #  use a not allowed-ip prefix
1378         #  this is dropped because there is no matching peer
1379         p = (
1380             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1381             / IP(src=self.pg0.remote_ip4, dst="20.22.3.2")
1382             / UDP(sport=555, dport=556)
1383             / Raw()
1384         )
1385         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1386         self.assertEqual(
1387             self.base_peer4_out_err + 1,
1388             self.statistics.get_err_counter(self.peer4_out_err),
1389         )
1390
1391         # send a handsake from the peer with an invalid MAC
1392         p = peer_1.mk_handshake(self.pg1)
1393         p[WireguardInitiation].mac1 = b"foobar"
1394         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1395         self.assertEqual(
1396             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1397         )
1398
1399         # send a handsake from the peer but signed by the wrong key.
1400         p = peer_1.mk_handshake(
1401             self.pg1, False, X25519PrivateKey.generate().public_key()
1402         )
1403         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1404         self.assertEqual(
1405             self.base_peer4_in_err + 1,
1406             self.statistics.get_err_counter(self.peer4_in_err),
1407         )
1408
1409         # send a valid handsake init for which we expect a response
1410         p = peer_1.mk_handshake(self.pg1)
1411
1412         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1413
1414         peer_1.consume_response(rx[0])
1415
1416         # route a packet into the wg interface
1417         #  this is dropped because the peer is still not initiated
1418         p = (
1419             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1420             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1421             / UDP(sport=555, dport=556)
1422             / Raw()
1423         )
1424         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1425         self.assertEqual(
1426             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1427         )
1428
1429         # send a data packet from the peer through the tunnel
1430         # this completes the handshake
1431         p = (
1432             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1433             / UDP(sport=222, dport=223)
1434             / Raw()
1435         )
1436         d = peer_1.encrypt_transport(p)
1437         p = peer_1.mk_tunnel_header(self.pg1) / (
1438             Wireguard(message_type=4, reserved_zero=0)
1439             / WireguardTransport(
1440                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1441             )
1442         )
1443         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1444
1445         for rx in rxs:
1446             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1447             self.assertEqual(rx[IP].ttl, 19)
1448
1449         # send a packets that are routed into the tunnel
1450         p = (
1451             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1452             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1453             / UDP(sport=555, dport=556)
1454             / Raw(b"\x00" * 80)
1455         )
1456
1457         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1458
1459         for rx in rxs:
1460             rx = IP(peer_1.decrypt_transport(rx))
1461
1462             # check the original packet is present
1463             self.assertEqual(rx[IP].dst, p[IP].dst)
1464             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1465
1466         # send packets into the tunnel, expect to receive them on
1467         # the other side
1468         p = [
1469             (
1470                 peer_1.mk_tunnel_header(self.pg1)
1471                 / Wireguard(message_type=4, reserved_zero=0)
1472                 / WireguardTransport(
1473                     receiver_index=peer_1.sender,
1474                     counter=ii + 1,
1475                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1476                         (
1477                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1478                             / UDP(sport=222, dport=223)
1479                             / Raw()
1480                         )
1481                     ),
1482                 )
1483             )
1484             for ii in range(255)
1485         ]
1486
1487         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1488
1489         for rx in rxs:
1490             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1491             self.assertEqual(rx[IP].ttl, 19)
1492
1493         r1.remove_vpp_config()
1494         r2.remove_vpp_config()
1495         peer_1.remove_vpp_config()
1496         wg0.remove_vpp_config()
1497
1498     def test_wg_peer_v6o6(self):
1499         """Test v6o6"""
1500
1501         port = 12343
1502
1503         # Create interfaces
1504         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1505         wg0.admin_up()
1506         wg0.config_ip6()
1507
1508         peer_1 = VppWgPeer(
1509             self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
1510         ).add_vpp_config()
1511         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1512
1513         r1 = VppIpRoute(
1514             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1515         ).add_vpp_config()
1516         r2 = VppIpRoute(
1517             self, "22::3:0", 112, [VppRoutePath("22::3:1", wg0.sw_if_index)]
1518         ).add_vpp_config()
1519
1520         # route a packet into the wg interface
1521         #  use the allowed-ip prefix
1522         #  this is dropped because the peer is not initiated
1523
1524         p = (
1525             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1526             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1527             / UDP(sport=555, dport=556)
1528             / Raw()
1529         )
1530         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1531
1532         self.assertEqual(
1533             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1534         )
1535
1536         # route a packet into the wg interface
1537         #  use a not allowed-ip prefix
1538         #  this is dropped because there is no matching peer
1539         p = (
1540             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1541             / IPv6(src=self.pg0.remote_ip6, dst="22::3:2")
1542             / UDP(sport=555, dport=556)
1543             / Raw()
1544         )
1545         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1546         self.assertEqual(
1547             self.base_peer6_out_err + 1,
1548             self.statistics.get_err_counter(self.peer6_out_err),
1549         )
1550
1551         # send a handsake from the peer with an invalid MAC
1552         p = peer_1.mk_handshake(self.pg1, True)
1553         p[WireguardInitiation].mac1 = b"foobar"
1554         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1555
1556         self.assertEqual(
1557             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1558         )
1559
1560         # send a handsake from the peer but signed by the wrong key.
1561         p = peer_1.mk_handshake(
1562             self.pg1, True, X25519PrivateKey.generate().public_key()
1563         )
1564         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1565         self.assertEqual(
1566             self.base_peer6_in_err + 1,
1567             self.statistics.get_err_counter(self.peer6_in_err),
1568         )
1569
1570         # send a valid handsake init for which we expect a response
1571         p = peer_1.mk_handshake(self.pg1, True)
1572
1573         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1574
1575         peer_1.consume_response(rx[0], True)
1576
1577         # route a packet into the wg interface
1578         #  this is dropped because the peer is still not initiated
1579         p = (
1580             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1581             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1582             / UDP(sport=555, dport=556)
1583             / Raw()
1584         )
1585         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1586         self.assertEqual(
1587             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1588         )
1589
1590         # send a data packet from the peer through the tunnel
1591         # this completes the handshake
1592         p = (
1593             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1594             / UDP(sport=222, dport=223)
1595             / Raw()
1596         )
1597         d = peer_1.encrypt_transport(p)
1598         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1599             Wireguard(message_type=4, reserved_zero=0)
1600             / WireguardTransport(
1601                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1602             )
1603         )
1604         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1605
1606         for rx in rxs:
1607             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1608             self.assertEqual(rx[IPv6].hlim, 19)
1609
1610         # send a packets that are routed into the tunnel
1611         p = (
1612             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1613             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1614             / UDP(sport=555, dport=556)
1615             / Raw(b"\x00" * 80)
1616         )
1617
1618         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1619
1620         for rx in rxs:
1621             rx = IPv6(peer_1.decrypt_transport(rx, True))
1622
1623             # check the original packet is present
1624             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1625             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1626
1627         # send packets into the tunnel, expect to receive them on
1628         # the other side
1629         p = [
1630             (
1631                 peer_1.mk_tunnel_header(self.pg1, True)
1632                 / Wireguard(message_type=4, reserved_zero=0)
1633                 / WireguardTransport(
1634                     receiver_index=peer_1.sender,
1635                     counter=ii + 1,
1636                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1637                         (
1638                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1639                             / UDP(sport=222, dport=223)
1640                             / Raw()
1641                         )
1642                     ),
1643                 )
1644             )
1645             for ii in range(255)
1646         ]
1647
1648         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1649
1650         for rx in rxs:
1651             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1652             self.assertEqual(rx[IPv6].hlim, 19)
1653
1654         r1.remove_vpp_config()
1655         r2.remove_vpp_config()
1656         peer_1.remove_vpp_config()
1657         wg0.remove_vpp_config()
1658
1659     def test_wg_peer_v6o4(self):
1660         """Test v6o4"""
1661
1662         port = 12353
1663
1664         # Create interfaces
1665         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1666         wg0.admin_up()
1667         wg0.config_ip6()
1668
1669         peer_1 = VppWgPeer(
1670             self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
1671         ).add_vpp_config()
1672         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1673
1674         r1 = VppIpRoute(
1675             self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
1676         ).add_vpp_config()
1677
1678         # route a packet into the wg interface
1679         #  use the allowed-ip prefix
1680         #  this is dropped because the peer is not initiated
1681         p = (
1682             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1683             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1684             / UDP(sport=555, dport=556)
1685             / Raw()
1686         )
1687         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1688         self.assertEqual(
1689             self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
1690         )
1691
1692         # send a handsake from the peer with an invalid MAC
1693         p = peer_1.mk_handshake(self.pg1)
1694         p[WireguardInitiation].mac1 = b"foobar"
1695         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1696
1697         self.assertEqual(
1698             self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
1699         )
1700
1701         # send a handsake from the peer but signed by the wrong key.
1702         p = peer_1.mk_handshake(
1703             self.pg1, False, X25519PrivateKey.generate().public_key()
1704         )
1705         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1706         self.assertEqual(
1707             self.base_peer4_in_err + 1,
1708             self.statistics.get_err_counter(self.peer4_in_err),
1709         )
1710
1711         # send a valid handsake init for which we expect a response
1712         p = peer_1.mk_handshake(self.pg1)
1713
1714         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1715
1716         peer_1.consume_response(rx[0])
1717
1718         # route a packet into the wg interface
1719         #  this is dropped because the peer is still not initiated
1720         p = (
1721             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1722             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1723             / UDP(sport=555, dport=556)
1724             / Raw()
1725         )
1726         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1727         self.assertEqual(
1728             self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
1729         )
1730
1731         # send a data packet from the peer through the tunnel
1732         # this completes the handshake
1733         p = (
1734             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1735             / UDP(sport=222, dport=223)
1736             / Raw()
1737         )
1738         d = peer_1.encrypt_transport(p)
1739         p = peer_1.mk_tunnel_header(self.pg1) / (
1740             Wireguard(message_type=4, reserved_zero=0)
1741             / WireguardTransport(
1742                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1743             )
1744         )
1745         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1746
1747         for rx in rxs:
1748             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1749             self.assertEqual(rx[IPv6].hlim, 19)
1750
1751         # send a packets that are routed into the tunnel
1752         p = (
1753             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1754             / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
1755             / UDP(sport=555, dport=556)
1756             / Raw(b"\x00" * 80)
1757         )
1758
1759         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1760
1761         for rx in rxs:
1762             rx = IPv6(peer_1.decrypt_transport(rx))
1763
1764             # check the original packet is present
1765             self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
1766             self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
1767
1768         # send packets into the tunnel, expect to receive them on
1769         # the other side
1770         p = [
1771             (
1772                 peer_1.mk_tunnel_header(self.pg1)
1773                 / Wireguard(message_type=4, reserved_zero=0)
1774                 / WireguardTransport(
1775                     receiver_index=peer_1.sender,
1776                     counter=ii + 1,
1777                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1778                         (
1779                             IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
1780                             / UDP(sport=222, dport=223)
1781                             / Raw()
1782                         )
1783                     ),
1784                 )
1785             )
1786             for ii in range(255)
1787         ]
1788
1789         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1790
1791         for rx in rxs:
1792             self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
1793             self.assertEqual(rx[IPv6].hlim, 19)
1794
1795         r1.remove_vpp_config()
1796         peer_1.remove_vpp_config()
1797         wg0.remove_vpp_config()
1798
1799     def test_wg_peer_v4o6(self):
1800         """Test v4o6"""
1801
1802         port = 12363
1803
1804         # Create interfaces
1805         wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
1806         wg0.admin_up()
1807         wg0.config_ip4()
1808
1809         peer_1 = VppWgPeer(
1810             self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
1811         ).add_vpp_config()
1812         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
1813
1814         r1 = VppIpRoute(
1815             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
1816         ).add_vpp_config()
1817
1818         # route a packet into the wg interface
1819         #  use the allowed-ip prefix
1820         #  this is dropped because the peer is not initiated
1821         p = (
1822             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1823             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1824             / UDP(sport=555, dport=556)
1825             / Raw()
1826         )
1827         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1828         self.assertEqual(
1829             self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
1830         )
1831
1832         # send a handsake from the peer with an invalid MAC
1833         p = peer_1.mk_handshake(self.pg1, True)
1834         p[WireguardInitiation].mac1 = b"foobar"
1835         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1836         self.assertEqual(
1837             self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
1838         )
1839
1840         # send a handsake from the peer but signed by the wrong key.
1841         p = peer_1.mk_handshake(
1842             self.pg1, True, X25519PrivateKey.generate().public_key()
1843         )
1844         self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
1845         self.assertEqual(
1846             self.base_peer6_in_err + 1,
1847             self.statistics.get_err_counter(self.peer6_in_err),
1848         )
1849
1850         # send a valid handsake init for which we expect a response
1851         p = peer_1.mk_handshake(self.pg1, True)
1852
1853         rx = self.send_and_expect(self.pg1, [p], self.pg1)
1854
1855         peer_1.consume_response(rx[0], True)
1856
1857         # route a packet into the wg interface
1858         #  this is dropped because the peer is still not initiated
1859         p = (
1860             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1861             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1862             / UDP(sport=555, dport=556)
1863             / Raw()
1864         )
1865         self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
1866         self.assertEqual(
1867             self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
1868         )
1869
1870         # send a data packet from the peer through the tunnel
1871         # this completes the handshake
1872         p = (
1873             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1874             / UDP(sport=222, dport=223)
1875             / Raw()
1876         )
1877         d = peer_1.encrypt_transport(p)
1878         p = peer_1.mk_tunnel_header(self.pg1, True) / (
1879             Wireguard(message_type=4, reserved_zero=0)
1880             / WireguardTransport(
1881                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
1882             )
1883         )
1884         rxs = self.send_and_expect(self.pg1, [p], self.pg0)
1885
1886         for rx in rxs:
1887             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1888             self.assertEqual(rx[IP].ttl, 19)
1889
1890         # send a packets that are routed into the tunnel
1891         p = (
1892             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1893             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
1894             / UDP(sport=555, dport=556)
1895             / Raw(b"\x00" * 80)
1896         )
1897
1898         rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
1899
1900         for rx in rxs:
1901             rx = IP(peer_1.decrypt_transport(rx, True))
1902
1903             # check the original packet is present
1904             self.assertEqual(rx[IP].dst, p[IP].dst)
1905             self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
1906
1907         # send packets into the tunnel, expect to receive them on
1908         # the other side
1909         p = [
1910             (
1911                 peer_1.mk_tunnel_header(self.pg1, True)
1912                 / Wireguard(message_type=4, reserved_zero=0)
1913                 / WireguardTransport(
1914                     receiver_index=peer_1.sender,
1915                     counter=ii + 1,
1916                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
1917                         (
1918                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
1919                             / UDP(sport=222, dport=223)
1920                             / Raw()
1921                         )
1922                     ),
1923                 )
1924             )
1925             for ii in range(255)
1926         ]
1927
1928         rxs = self.send_and_expect(self.pg1, p, self.pg0)
1929
1930         for rx in rxs:
1931             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
1932             self.assertEqual(rx[IP].ttl, 19)
1933
1934         r1.remove_vpp_config()
1935         peer_1.remove_vpp_config()
1936         wg0.remove_vpp_config()
1937
1938     def test_wg_multi_peer(self):
1939         """multiple peer setup"""
1940         port = 12373
1941
1942         # Create interfaces
1943         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
1944         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
1945         wg0.admin_up()
1946         wg1.admin_up()
1947
1948         # Check peer counter
1949         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
1950
1951         self.pg_enable_capture(self.pg_interfaces)
1952         self.pg_start()
1953
1954         # Create many peers on sencond interface
1955         NUM_PEERS = 16
1956         self.pg2.generate_remote_hosts(NUM_PEERS)
1957         self.pg2.configure_ipv4_neighbors()
1958         self.pg1.generate_remote_hosts(NUM_PEERS)
1959         self.pg1.configure_ipv4_neighbors()
1960
1961         peers_1 = []
1962         peers_2 = []
1963         routes_1 = []
1964         routes_2 = []
1965         for i in range(NUM_PEERS):
1966             peers_1.append(
1967                 VppWgPeer(
1968                     self,
1969                     wg0,
1970                     self.pg1.remote_hosts[i].ip4,
1971                     port + 1 + i,
1972                     ["10.0.%d.4/32" % i],
1973                 ).add_vpp_config()
1974             )
1975             routes_1.append(
1976                 VppIpRoute(
1977                     self,
1978                     "10.0.%d.4" % i,
1979                     32,
1980                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
1981                 ).add_vpp_config()
1982             )
1983
1984             peers_2.append(
1985                 VppWgPeer(
1986                     self,
1987                     wg1,
1988                     self.pg2.remote_hosts[i].ip4,
1989                     port + 100 + i,
1990                     ["10.100.%d.4/32" % i],
1991                 ).add_vpp_config()
1992             )
1993             routes_2.append(
1994                 VppIpRoute(
1995                     self,
1996                     "10.100.%d.4" % i,
1997                     32,
1998                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
1999                 ).add_vpp_config()
2000             )
2001
2002         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
2003
2004         self.logger.info(self.vapi.cli("show wireguard peer"))
2005         self.logger.info(self.vapi.cli("show wireguard interface"))
2006         self.logger.info(self.vapi.cli("show adj 37"))
2007         self.logger.info(self.vapi.cli("sh ip fib 172.16.3.17"))
2008         self.logger.info(self.vapi.cli("sh ip fib 10.11.3.0"))
2009
2010         # remove routes
2011         for r in routes_1:
2012             r.remove_vpp_config()
2013         for r in routes_2:
2014             r.remove_vpp_config()
2015
2016         # remove peers
2017         for p in peers_1:
2018             self.assertTrue(p.query_vpp_config())
2019             p.remove_vpp_config()
2020         for p in peers_2:
2021             self.assertTrue(p.query_vpp_config())
2022             p.remove_vpp_config()
2023
2024         wg0.remove_vpp_config()
2025         wg1.remove_vpp_config()
2026
2027     def test_wg_multi_interface(self):
2028         """Multi-tunnel on the same port"""
2029         port = 12500
2030
2031         # Create many wireguard interfaces
2032         NUM_IFS = 4
2033         self.pg1.generate_remote_hosts(NUM_IFS)
2034         self.pg1.configure_ipv4_neighbors()
2035         self.pg0.generate_remote_hosts(NUM_IFS)
2036         self.pg0.configure_ipv4_neighbors()
2037
2038         # Create interfaces with a peer on each
2039         peers = []
2040         routes = []
2041         wg_ifs = []
2042         for i in range(NUM_IFS):
2043             # Use the same port for each interface
2044             wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2045             wg0.admin_up()
2046             wg0.config_ip4()
2047             wg_ifs.append(wg0)
2048             peers.append(
2049                 VppWgPeer(
2050                     self,
2051                     wg0,
2052                     self.pg1.remote_hosts[i].ip4,
2053                     port + 1 + i,
2054                     ["10.0.%d.0/24" % i],
2055                 ).add_vpp_config()
2056             )
2057
2058             routes.append(
2059                 VppIpRoute(
2060                     self,
2061                     "10.0.%d.0" % i,
2062                     24,
2063                     [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)],
2064                 ).add_vpp_config()
2065             )
2066
2067         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
2068
2069         for i in range(NUM_IFS):
2070             # send a valid handsake init for which we expect a response
2071             p = peers[i].mk_handshake(self.pg1)
2072             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2073             peers[i].consume_response(rx[0])
2074
2075             # send a data packet from the peer through the tunnel
2076             # this completes the handshake
2077             p = (
2078                 IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20)
2079                 / UDP(sport=222, dport=223)
2080                 / Raw()
2081             )
2082             d = peers[i].encrypt_transport(p)
2083             p = peers[i].mk_tunnel_header(self.pg1) / (
2084                 Wireguard(message_type=4, reserved_zero=0)
2085                 / WireguardTransport(
2086                     receiver_index=peers[i].sender,
2087                     counter=0,
2088                     encrypted_encapsulated_packet=d,
2089                 )
2090             )
2091             rxs = self.send_and_expect(self.pg1, [p], self.pg0)
2092             for rx in rxs:
2093                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2094                 self.assertEqual(rx[IP].ttl, 19)
2095
2096         # send a packets that are routed into the tunnel
2097         for i in range(NUM_IFS):
2098             p = (
2099                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2100                 / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i)
2101                 / UDP(sport=555, dport=556)
2102                 / Raw(b"\x00" * 80)
2103             )
2104
2105             rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
2106
2107             for rx in rxs:
2108                 rx = IP(peers[i].decrypt_transport(rx))
2109
2110                 # check the oringial packet is present
2111                 self.assertEqual(rx[IP].dst, p[IP].dst)
2112                 self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
2113
2114         # send packets into the tunnel
2115         for i in range(NUM_IFS):
2116             p = [
2117                 (
2118                     peers[i].mk_tunnel_header(self.pg1)
2119                     / Wireguard(message_type=4, reserved_zero=0)
2120                     / WireguardTransport(
2121                         receiver_index=peers[i].sender,
2122                         counter=ii + 1,
2123                         encrypted_encapsulated_packet=peers[i].encrypt_transport(
2124                             (
2125                                 IP(
2126                                     src="10.0.%d.4" % i,
2127                                     dst=self.pg0.remote_hosts[i].ip4,
2128                                     ttl=20,
2129                                 )
2130                                 / UDP(sport=222, dport=223)
2131                                 / Raw()
2132                             )
2133                         ),
2134                     )
2135                 )
2136                 for ii in range(64)
2137             ]
2138
2139             rxs = self.send_and_expect(self.pg1, p, self.pg0)
2140
2141             for rx in rxs:
2142                 self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
2143                 self.assertEqual(rx[IP].ttl, 19)
2144
2145         for r in routes:
2146             r.remove_vpp_config()
2147         for p in peers:
2148             p.remove_vpp_config()
2149         for i in wg_ifs:
2150             i.remove_vpp_config()
2151
2152     def test_wg_event(self):
2153         """Test events"""
2154         port = 12600
2155         ESTABLISHED_FLAG = (
2156             VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED
2157         )
2158         DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD
2159
2160         # Create interfaces
2161         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2162         wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
2163         wg0.admin_up()
2164         wg1.admin_up()
2165
2166         # Check peer counter
2167         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 0)
2168
2169         self.pg_enable_capture(self.pg_interfaces)
2170         self.pg_start()
2171
2172         # Create peers
2173         NUM_PEERS = 2
2174         self.pg2.generate_remote_hosts(NUM_PEERS)
2175         self.pg2.configure_ipv4_neighbors()
2176         self.pg1.generate_remote_hosts(NUM_PEERS)
2177         self.pg1.configure_ipv4_neighbors()
2178
2179         peers_0 = []
2180         peers_1 = []
2181         routes_0 = []
2182         routes_1 = []
2183         for i in range(NUM_PEERS):
2184             peers_0.append(
2185                 VppWgPeer(
2186                     self,
2187                     wg0,
2188                     self.pg1.remote_hosts[i].ip4,
2189                     port + 1 + i,
2190                     ["10.0.%d.4/32" % i],
2191                 ).add_vpp_config()
2192             )
2193             routes_0.append(
2194                 VppIpRoute(
2195                     self,
2196                     "10.0.%d.4" % i,
2197                     32,
2198                     [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
2199                 ).add_vpp_config()
2200             )
2201
2202             peers_1.append(
2203                 VppWgPeer(
2204                     self,
2205                     wg1,
2206                     self.pg2.remote_hosts[i].ip4,
2207                     port + 100 + i,
2208                     ["10.100.%d.4/32" % i],
2209                 ).add_vpp_config()
2210             )
2211             routes_1.append(
2212                 VppIpRoute(
2213                     self,
2214                     "10.100.%d.4" % i,
2215                     32,
2216                     [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
2217                 ).add_vpp_config()
2218             )
2219
2220         self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
2221
2222         # Want events from the first perr of wg0
2223         # and from all wg1 peers
2224         peers_0[0].want_events()
2225         wg1.want_events()
2226
2227         for i in range(NUM_PEERS):
2228             # send a valid handsake init for which we expect a response
2229             p = peers_0[i].mk_handshake(self.pg1)
2230             rx = self.send_and_expect(self.pg1, [p], self.pg1)
2231             peers_0[i].consume_response(rx[0])
2232             if i == 0:
2233                 peers_0[0].wait_event(ESTABLISHED_FLAG)
2234
2235             p = peers_1[i].mk_handshake(self.pg2)
2236             rx = self.send_and_expect(self.pg2, [p], self.pg2)
2237             peers_1[i].consume_response(rx[0])
2238
2239         wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index])
2240
2241         # remove routes
2242         for r in routes_0:
2243             r.remove_vpp_config()
2244         for r in routes_1:
2245             r.remove_vpp_config()
2246
2247         # remove peers
2248         for i in range(NUM_PEERS):
2249             self.assertTrue(peers_0[i].query_vpp_config())
2250             peers_0[i].remove_vpp_config()
2251             if i == 0:
2252                 peers_0[i].wait_event(0)
2253                 peers_0[i].wait_event(DEAD_FLAG)
2254         for p in peers_1:
2255             self.assertTrue(p.query_vpp_config())
2256             p.remove_vpp_config()
2257             p.wait_event(0)
2258             p.wait_event(DEAD_FLAG)
2259
2260         wg0.remove_vpp_config()
2261         wg1.remove_vpp_config()
2262
2263
2264 @tag_fixme_ubuntu2204
2265 @tag_fixme_debian11
2266 class WireguardHandoffTests(TestWg):
2267     """Wireguard Tests in multi worker setup"""
2268
2269     vpp_worker_count = 2
2270
2271     def test_wg_peer_init(self):
2272         """Handoff"""
2273
2274         port = 12383
2275
2276         # Create interfaces
2277         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2278         wg0.admin_up()
2279         wg0.config_ip4()
2280
2281         peer_1 = VppWgPeer(
2282             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
2283         ).add_vpp_config()
2284         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2285
2286         r1 = VppIpRoute(
2287             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2288         ).add_vpp_config()
2289
2290         # send a valid handsake init for which we expect a response
2291         p = peer_1.mk_handshake(self.pg1)
2292
2293         rx = self.send_and_expect(self.pg1, [p], self.pg1)
2294
2295         peer_1.consume_response(rx[0])
2296
2297         # send a data packet from the peer through the tunnel
2298         # this completes the handshake and pins the peer to worker 0
2299         p = (
2300             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2301             / UDP(sport=222, dport=223)
2302             / Raw()
2303         )
2304         d = peer_1.encrypt_transport(p)
2305         p = peer_1.mk_tunnel_header(self.pg1) / (
2306             Wireguard(message_type=4, reserved_zero=0)
2307             / WireguardTransport(
2308                 receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
2309             )
2310         )
2311         rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
2312
2313         for rx in rxs:
2314             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2315             self.assertEqual(rx[IP].ttl, 19)
2316
2317         # send a packets that are routed into the tunnel
2318         # and pins the peer tp worker 1
2319         pe = (
2320             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2321             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2322             / UDP(sport=555, dport=556)
2323             / Raw(b"\x00" * 80)
2324         )
2325         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
2326         peer_1.validate_encapped(rxs, pe)
2327
2328         # send packets into the tunnel, from the other worker
2329         p = [
2330             (
2331                 peer_1.mk_tunnel_header(self.pg1)
2332                 / Wireguard(message_type=4, reserved_zero=0)
2333                 / WireguardTransport(
2334                     receiver_index=peer_1.sender,
2335                     counter=ii + 1,
2336                     encrypted_encapsulated_packet=peer_1.encrypt_transport(
2337                         (
2338                             IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
2339                             / UDP(sport=222, dport=223)
2340                             / Raw()
2341                         )
2342                     ),
2343                 )
2344             )
2345             for ii in range(255)
2346         ]
2347
2348         rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
2349
2350         for rx in rxs:
2351             self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
2352             self.assertEqual(rx[IP].ttl, 19)
2353
2354         # send a packets that are routed into the tunnel
2355         # from worker 0
2356         rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
2357
2358         peer_1.validate_encapped(rxs, pe)
2359
2360         r1.remove_vpp_config()
2361         peer_1.remove_vpp_config()
2362         wg0.remove_vpp_config()
2363
2364     @unittest.skip("test disabled")
2365     def test_wg_multi_interface(self):
2366         """Multi-tunnel on the same port"""
2367
2368
2369 class TestWgFIB(VppTestCase):
2370     """Wireguard FIB Test Case"""
2371
2372     @classmethod
2373     def setUpClass(cls):
2374         super(TestWgFIB, cls).setUpClass()
2375
2376     @classmethod
2377     def tearDownClass(cls):
2378         super(TestWgFIB, cls).tearDownClass()
2379
2380     def setUp(self):
2381         super(TestWgFIB, self).setUp()
2382
2383         self.create_pg_interfaces(range(2))
2384
2385         for i in self.pg_interfaces:
2386             i.admin_up()
2387             i.config_ip4()
2388
2389     def tearDown(self):
2390         for i in self.pg_interfaces:
2391             i.unconfig_ip4()
2392             i.admin_down()
2393         super(TestWgFIB, self).tearDown()
2394
2395     def test_wg_fib_tracking(self):
2396         """FIB tracking"""
2397         port = 12323
2398
2399         # create wg interface
2400         wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
2401         wg0.admin_up()
2402         wg0.config_ip4()
2403
2404         self.pg_enable_capture(self.pg_interfaces)
2405         self.pg_start()
2406
2407         # create a peer
2408         peer_1 = VppWgPeer(
2409             self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
2410         ).add_vpp_config()
2411         self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
2412
2413         # create a route to rewrite traffic into the wg interface
2414         r1 = VppIpRoute(
2415             self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
2416         ).add_vpp_config()
2417
2418         # resolve ARP and expect the adjacency to update
2419         self.pg1.resolve_arp()
2420
2421         # wait for the peer to send a handshake initiation
2422         rxs = self.pg1.get_capture(2, timeout=6)
2423
2424         # prepare and send a handshake response
2425         # expect a keepalive message
2426         resp = peer_1.consume_init(rxs[1], self.pg1)
2427         rxs = self.send_and_expect(self.pg1, [resp], self.pg1)
2428
2429         # verify the keepalive message
2430         b = peer_1.decrypt_transport(rxs[0])
2431         self.assertEqual(0, len(b))
2432
2433         # prepare and send a packet that will be rewritten into the wg interface
2434         # expect a data packet sent to the new endpoint
2435         p = (
2436             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2437             / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
2438             / UDP(sport=555, dport=556)
2439             / Raw()
2440         )
2441         rxs = self.send_and_expect(self.pg0, [p], self.pg1)
2442
2443         # verify the data packet
2444         peer_1.validate_encapped(rxs, p)
2445
2446         # remove configs
2447         r1.remove_vpp_config()
2448         peer_1.remove_vpp_config()
2449         wg0.remove_vpp_config()