c7ea79abfd01a5a6e3189450cea9480b89b9e6fd
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 from collections import namedtuple
8 import hashlib
9 import ipaddress
10 import reprlib
11 import time
12 import unittest
13 from random import randint, shuffle, getrandbits
14 from socket import AF_INET, AF_INET6, inet_ntop
15 from struct import pack, unpack
16
17 import scapy.compat
18 from scapy.layers.inet import UDP, IP
19 from scapy.layers.inet6 import IPv6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22
23 from config import config
24 from bfd import (
25     VppBFDAuthKey,
26     BFD,
27     BFDAuthType,
28     VppBFDUDPSession,
29     BFDDiagCode,
30     BFDState,
31     BFD_vpp_echo,
32 )
33 from framework import tag_fixme_vpp_workers
34 from framework import VppTestCase, VppTestRunner
35 from framework import tag_run_solo
36 from util import ppp
37 from vpp_ip import DpoProto
38 from vpp_ip_route import VppIpRoute, VppRoutePath
39 from vpp_lo_interface import VppLoInterface
40 from vpp_papi_provider import UnexpectedApiReturnValueError, CliFailedCommandError
41 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
42 from vpp_gre_interface import VppGreInterface
43 from vpp_papi import VppEnum
44
45 USEC_IN_SEC = 1000000
46
47
48 class AuthKeyFactory(object):
49     """Factory class for creating auth keys with unique conf key ID"""
50
51     def __init__(self):
52         self._conf_key_ids = {}
53
54     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
55         """create a random key with unique conf key id"""
56         conf_key_id = randint(0, 0xFFFFFFFF)
57         while conf_key_id in self._conf_key_ids:
58             conf_key_id = randint(0, 0xFFFFFFFF)
59         self._conf_key_ids[conf_key_id] = 1
60         key = scapy.compat.raw(
61             bytearray([randint(0, 255) for _ in range(randint(1, 20))])
62         )
63         return VppBFDAuthKey(
64             test=test, auth_type=auth_type, conf_key_id=conf_key_id, key=key
65         )
66
67
68 class BFDAPITestCase(VppTestCase):
69     """Bidirectional Forwarding Detection (BFD) - API"""
70
71     pg0 = None
72     pg1 = None
73
74     @classmethod
75     def setUpClass(cls):
76         super(BFDAPITestCase, cls).setUpClass()
77         cls.vapi.cli("set log class bfd level debug")
78         try:
79             cls.create_pg_interfaces(range(2))
80             for i in cls.pg_interfaces:
81                 i.config_ip4()
82                 i.config_ip6()
83                 i.resolve_arp()
84
85         except Exception:
86             super(BFDAPITestCase, cls).tearDownClass()
87             raise
88
89     @classmethod
90     def tearDownClass(cls):
91         super(BFDAPITestCase, cls).tearDownClass()
92
93     def setUp(self):
94         super(BFDAPITestCase, self).setUp()
95         self.factory = AuthKeyFactory()
96
97     def test_add_bfd(self):
98         """create a BFD session"""
99         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
100         session.add_vpp_config()
101         self.logger.debug("Session state is %s", session.state)
102         session.remove_vpp_config()
103         session.add_vpp_config()
104         self.logger.debug("Session state is %s", session.state)
105         session.remove_vpp_config()
106
107     def test_double_add(self):
108         """create the same BFD session twice (negative case)"""
109         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
110         session.add_vpp_config()
111
112         with self.vapi.assert_negative_api_retval():
113             session.add_vpp_config()
114
115         session.remove_vpp_config()
116
117     def test_add_bfd6(self):
118         """create IPv6 BFD session"""
119         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
120         session.add_vpp_config()
121         self.logger.debug("Session state is %s", session.state)
122         session.remove_vpp_config()
123         session.add_vpp_config()
124         self.logger.debug("Session state is %s", session.state)
125         session.remove_vpp_config()
126
127     def test_mod_bfd(self):
128         """modify BFD session parameters"""
129         session = VppBFDUDPSession(
130             self,
131             self.pg0,
132             self.pg0.remote_ip4,
133             desired_min_tx=50000,
134             required_min_rx=10000,
135             detect_mult=1,
136         )
137         session.add_vpp_config()
138         s = session.get_bfd_udp_session_dump_entry()
139         self.assert_equal(
140             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
141         )
142         self.assert_equal(
143             session.required_min_rx, s.required_min_rx, "required min receive interval"
144         )
145         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
146         session.modify_parameters(
147             desired_min_tx=session.desired_min_tx * 2,
148             required_min_rx=session.required_min_rx * 2,
149             detect_mult=session.detect_mult * 2,
150         )
151         s = session.get_bfd_udp_session_dump_entry()
152         self.assert_equal(
153             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
154         )
155         self.assert_equal(
156             session.required_min_rx, s.required_min_rx, "required min receive interval"
157         )
158         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
159
160     def test_upd_bfd(self):
161         """Create/Modify w/ Update BFD session parameters"""
162         session = VppBFDUDPSession(
163             self,
164             self.pg0,
165             self.pg0.remote_ip4,
166             desired_min_tx=50000,
167             required_min_rx=10000,
168             detect_mult=1,
169         )
170         session.upd_vpp_config()
171         s = session.get_bfd_udp_session_dump_entry()
172         self.assert_equal(
173             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
174         )
175         self.assert_equal(
176             session.required_min_rx, s.required_min_rx, "required min receive interval"
177         )
178
179         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
180         session.upd_vpp_config(
181             desired_min_tx=session.desired_min_tx * 2,
182             required_min_rx=session.required_min_rx * 2,
183             detect_mult=session.detect_mult * 2,
184         )
185         s = session.get_bfd_udp_session_dump_entry()
186         self.assert_equal(
187             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
188         )
189         self.assert_equal(
190             session.required_min_rx, s.required_min_rx, "required min receive interval"
191         )
192         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
193
194     def test_add_sha1_keys(self):
195         """add SHA1 keys"""
196         key_count = 10
197         keys = [self.factory.create_random_key(self) for i in range(0, key_count)]
198         for key in keys:
199             self.assertFalse(key.query_vpp_config())
200         for key in keys:
201             key.add_vpp_config()
202         for key in keys:
203             self.assertTrue(key.query_vpp_config())
204         # remove randomly
205         indexes = list(range(key_count))
206         shuffle(indexes)
207         removed = []
208         for i in indexes:
209             key = keys[i]
210             key.remove_vpp_config()
211             removed.append(i)
212             for j in range(key_count):
213                 key = keys[j]
214                 if j in removed:
215                     self.assertFalse(key.query_vpp_config())
216                 else:
217                     self.assertTrue(key.query_vpp_config())
218         # should be removed now
219         for key in keys:
220             self.assertFalse(key.query_vpp_config())
221         # add back and remove again
222         for key in keys:
223             key.add_vpp_config()
224         for key in keys:
225             self.assertTrue(key.query_vpp_config())
226         for key in keys:
227             key.remove_vpp_config()
228         for key in keys:
229             self.assertFalse(key.query_vpp_config())
230
231     def test_add_bfd_sha1(self):
232         """create a BFD session (SHA1)"""
233         key = self.factory.create_random_key(self)
234         key.add_vpp_config()
235         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
236         session.add_vpp_config()
237         self.logger.debug("Session state is %s", session.state)
238         session.remove_vpp_config()
239         session.add_vpp_config()
240         self.logger.debug("Session state is %s", session.state)
241         session.remove_vpp_config()
242
243     def test_double_add_sha1(self):
244         """create the same BFD session twice (negative case) (SHA1)"""
245         key = self.factory.create_random_key(self)
246         key.add_vpp_config()
247         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
248         session.add_vpp_config()
249         with self.assertRaises(Exception):
250             session.add_vpp_config()
251
252     def test_add_auth_nonexistent_key(self):
253         """create BFD session using non-existent SHA1 (negative case)"""
254         session = VppBFDUDPSession(
255             self,
256             self.pg0,
257             self.pg0.remote_ip4,
258             sha1_key=self.factory.create_random_key(self),
259         )
260         with self.assertRaises(Exception):
261             session.add_vpp_config()
262
263     def test_shared_sha1_key(self):
264         """single SHA1 key shared by multiple BFD sessions"""
265         key = self.factory.create_random_key(self)
266         key.add_vpp_config()
267         sessions = [
268             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key),
269             VppBFDUDPSession(
270                 self, self.pg0, self.pg0.remote_ip6, sha1_key=key, af=AF_INET6
271             ),
272             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4, sha1_key=key),
273             VppBFDUDPSession(
274                 self, self.pg1, self.pg1.remote_ip6, sha1_key=key, af=AF_INET6
275             ),
276         ]
277         for s in sessions:
278             s.add_vpp_config()
279         removed = 0
280         for s in sessions:
281             e = key.get_bfd_auth_keys_dump_entry()
282             self.assert_equal(
283                 e.use_count, len(sessions) - removed, "Use count for shared key"
284             )
285             s.remove_vpp_config()
286             removed += 1
287         e = key.get_bfd_auth_keys_dump_entry()
288         self.assert_equal(
289             e.use_count, len(sessions) - removed, "Use count for shared key"
290         )
291
292     def test_activate_auth(self):
293         """activate SHA1 authentication"""
294         key = self.factory.create_random_key(self)
295         key.add_vpp_config()
296         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
297         session.add_vpp_config()
298         session.activate_auth(key)
299
300     def test_deactivate_auth(self):
301         """deactivate SHA1 authentication"""
302         key = self.factory.create_random_key(self)
303         key.add_vpp_config()
304         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
305         session.add_vpp_config()
306         session.activate_auth(key)
307         session.deactivate_auth()
308
309     def test_change_key(self):
310         """change SHA1 key"""
311         key1 = self.factory.create_random_key(self)
312         key2 = self.factory.create_random_key(self)
313         while key2.conf_key_id == key1.conf_key_id:
314             key2 = self.factory.create_random_key(self)
315         key1.add_vpp_config()
316         key2.add_vpp_config()
317         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key1)
318         session.add_vpp_config()
319         session.activate_auth(key2)
320
321     def test_set_del_udp_echo_source(self):
322         """set/del udp echo source"""
323         self.create_loopback_interfaces(1)
324         self.loopback0 = self.lo_interfaces[0]
325         self.loopback0.admin_up()
326         echo_source = self.vapi.bfd_udp_get_echo_source()
327         self.assertFalse(echo_source.is_set)
328         self.assertFalse(echo_source.have_usable_ip4)
329         self.assertFalse(echo_source.have_usable_ip6)
330
331         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
332         echo_source = self.vapi.bfd_udp_get_echo_source()
333         self.assertTrue(echo_source.is_set)
334         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
335         self.assertFalse(echo_source.have_usable_ip4)
336         self.assertFalse(echo_source.have_usable_ip6)
337
338         self.loopback0.config_ip4()
339         echo_ip4 = ipaddress.IPv4Address(
340             int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
341         ).packed
342         echo_source = self.vapi.bfd_udp_get_echo_source()
343         self.assertTrue(echo_source.is_set)
344         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
345         self.assertTrue(echo_source.have_usable_ip4)
346         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
347         self.assertFalse(echo_source.have_usable_ip6)
348
349         self.loopback0.config_ip6()
350         echo_ip6 = ipaddress.IPv6Address(
351             int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
352         ).packed
353
354         echo_source = self.vapi.bfd_udp_get_echo_source()
355         self.assertTrue(echo_source.is_set)
356         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
357         self.assertTrue(echo_source.have_usable_ip4)
358         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
359         self.assertTrue(echo_source.have_usable_ip6)
360         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
361
362         self.vapi.bfd_udp_del_echo_source()
363         echo_source = self.vapi.bfd_udp_get_echo_source()
364         self.assertFalse(echo_source.is_set)
365         self.assertFalse(echo_source.have_usable_ip4)
366         self.assertFalse(echo_source.have_usable_ip6)
367
368
369 class BFDTestSession(object):
370     """BFD session as seen from test framework side"""
371
372     def __init__(
373         self,
374         test,
375         interface,
376         af,
377         detect_mult=3,
378         sha1_key=None,
379         bfd_key_id=None,
380         our_seq_number=None,
381         tunnel_header=None,
382         phy_interface=None,
383     ):
384         self.test = test
385         self.af = af
386         self.sha1_key = sha1_key
387         self.bfd_key_id = bfd_key_id
388         self.interface = interface
389         if phy_interface:
390             self.phy_interface = phy_interface
391         else:
392             self.phy_interface = self.interface
393         self.udp_sport = randint(49152, 65535)
394         if our_seq_number is None:
395             self.our_seq_number = randint(0, 40000000)
396         else:
397             self.our_seq_number = our_seq_number
398         self.vpp_seq_number = None
399         self.my_discriminator = 0
400         self.desired_min_tx = 300000
401         self.required_min_rx = 300000
402         self.required_min_echo_rx = None
403         self.detect_mult = detect_mult
404         self.diag = BFDDiagCode.no_diagnostic
405         self.your_discriminator = None
406         self.state = BFDState.down
407         self.auth_type = BFDAuthType.no_auth
408         self.tunnel_header = tunnel_header
409         self.tx_packets = 0
410         self.rx_packets = 0
411         self.tx_packets_echo = 0
412         self.rx_packets_echo = 0
413
414     def inc_seq_num(self):
415         """increment sequence number, wrapping if needed"""
416         if self.our_seq_number == 0xFFFFFFFF:
417             self.our_seq_number = 0
418         else:
419             self.our_seq_number += 1
420
421     def update(
422         self,
423         my_discriminator=None,
424         your_discriminator=None,
425         desired_min_tx=None,
426         required_min_rx=None,
427         required_min_echo_rx=None,
428         detect_mult=None,
429         diag=None,
430         state=None,
431         auth_type=None,
432     ):
433         """update BFD parameters associated with session"""
434         if my_discriminator is not None:
435             self.my_discriminator = my_discriminator
436         if your_discriminator is not None:
437             self.your_discriminator = your_discriminator
438         if required_min_rx is not None:
439             self.required_min_rx = required_min_rx
440         if required_min_echo_rx is not None:
441             self.required_min_echo_rx = required_min_echo_rx
442         if desired_min_tx is not None:
443             self.desired_min_tx = desired_min_tx
444         if detect_mult is not None:
445             self.detect_mult = detect_mult
446         if diag is not None:
447             self.diag = diag
448         if state is not None:
449             self.state = state
450         if auth_type is not None:
451             self.auth_type = auth_type
452
453     def fill_packet_fields(self, packet):
454         """set packet fields with known values in packet"""
455         bfd = packet[BFD]
456         if self.my_discriminator:
457             self.test.logger.debug(
458                 "BFD: setting packet.my_discriminator=%s", self.my_discriminator
459             )
460             bfd.my_discriminator = self.my_discriminator
461         if self.your_discriminator:
462             self.test.logger.debug(
463                 "BFD: setting packet.your_discriminator=%s", self.your_discriminator
464             )
465             bfd.your_discriminator = self.your_discriminator
466         if self.required_min_rx:
467             self.test.logger.debug(
468                 "BFD: setting packet.required_min_rx_interval=%s", self.required_min_rx
469             )
470             bfd.required_min_rx_interval = self.required_min_rx
471         if self.required_min_echo_rx:
472             self.test.logger.debug(
473                 "BFD: setting packet.required_min_echo_rx=%s", self.required_min_echo_rx
474             )
475             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
476         if self.desired_min_tx:
477             self.test.logger.debug(
478                 "BFD: setting packet.desired_min_tx_interval=%s", self.desired_min_tx
479             )
480             bfd.desired_min_tx_interval = self.desired_min_tx
481         if self.detect_mult:
482             self.test.logger.debug(
483                 "BFD: setting packet.detect_mult=%s", self.detect_mult
484             )
485             bfd.detect_mult = self.detect_mult
486         if self.diag:
487             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
488             bfd.diag = self.diag
489         if self.state:
490             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
491             bfd.state = self.state
492         if self.auth_type:
493             # this is used by a negative test-case
494             self.test.logger.debug("BFD: setting packet.auth_type=%s", self.auth_type)
495             bfd.auth_type = self.auth_type
496
497     def create_packet(self):
498         """create a BFD packet, reflecting the current state of session"""
499         if self.sha1_key:
500             bfd = BFD(flags="A")
501             bfd.auth_type = self.sha1_key.auth_type
502             bfd.auth_len = BFD.sha1_auth_len
503             bfd.auth_key_id = self.bfd_key_id
504             bfd.auth_seq_num = self.our_seq_number
505             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
506         else:
507             bfd = BFD()
508         packet = Ether(
509             src=self.phy_interface.remote_mac, dst=self.phy_interface.local_mac
510         )
511         if self.tunnel_header:
512             packet = packet / self.tunnel_header
513         if self.af == AF_INET6:
514             packet = (
515                 packet
516                 / IPv6(
517                     src=self.interface.remote_ip6,
518                     dst=self.interface.local_ip6,
519                     hlim=255,
520                 )
521                 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
522                 / bfd
523             )
524         else:
525             packet = (
526                 packet
527                 / IP(
528                     src=self.interface.remote_ip4, dst=self.interface.local_ip4, ttl=255
529                 )
530                 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
531                 / bfd
532             )
533         self.test.logger.debug("BFD: Creating packet")
534         self.fill_packet_fields(packet)
535         if self.sha1_key:
536             hash_material = (
537                 scapy.compat.raw(packet[BFD])[:32]
538                 + self.sha1_key.key
539                 + b"\0" * (20 - len(self.sha1_key.key))
540             )
541             self.test.logger.debug(
542                 "BFD: Calculated SHA1 hash: %s"
543                 % hashlib.sha1(hash_material).hexdigest()
544             )
545             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
546         return packet
547
548     def send_packet(self, packet=None, interface=None):
549         """send packet on interface, creating the packet if needed"""
550         if packet is None:
551             packet = self.create_packet()
552         if interface is None:
553             interface = self.phy_interface
554         self.test.logger.debug(ppp("Sending packet:", packet))
555         interface.add_stream(packet)
556         self.tx_packets += 1
557         self.test.pg_start()
558
559     def verify_sha1_auth(self, packet):
560         """Verify correctness of authentication in BFD layer."""
561         bfd = packet[BFD]
562         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
563         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type, BFDAuthType)
564         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
565         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
566         if self.vpp_seq_number is None:
567             self.vpp_seq_number = bfd.auth_seq_num
568             self.test.logger.debug(
569                 "Received initial sequence number: %s" % self.vpp_seq_number
570             )
571         else:
572             recvd_seq_num = bfd.auth_seq_num
573             self.test.logger.debug(
574                 "Received followup sequence number: %s" % recvd_seq_num
575             )
576             if self.vpp_seq_number < 0xFFFFFFFF:
577                 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
578                     self.test.assert_equal(
579                         recvd_seq_num, self.vpp_seq_number + 1, "BFD sequence number"
580                     )
581                 else:
582                     self.test.assert_in_range(
583                         recvd_seq_num,
584                         self.vpp_seq_number,
585                         self.vpp_seq_number + 1,
586                         "BFD sequence number",
587                     )
588             else:
589                 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
590                     self.test.assert_equal(recvd_seq_num, 0, "BFD sequence number")
591                 else:
592                     self.test.assertIn(
593                         recvd_seq_num,
594                         (self.vpp_seq_number, 0),
595                         "BFD sequence number not one of "
596                         "(%s, 0)" % self.vpp_seq_number,
597                     )
598             self.vpp_seq_number = recvd_seq_num
599         # last 20 bytes represent the hash - so replace them with the key,
600         # pad the result with zeros and hash the result
601         hash_material = (
602             bfd.original[:-20]
603             + self.sha1_key.key
604             + b"\0" * (20 - len(self.sha1_key.key))
605         )
606         expected_hash = hashlib.sha1(hash_material).hexdigest()
607         self.test.assert_equal(
608             binascii.hexlify(bfd.auth_key_hash), expected_hash.encode(), "Auth key hash"
609         )
610
611     def verify_bfd(self, packet):
612         """Verify correctness of BFD layer."""
613         bfd = packet[BFD]
614         self.test.assert_equal(bfd.version, 1, "BFD version")
615         self.test.assert_equal(
616             bfd.your_discriminator, self.my_discriminator, "BFD - your discriminator"
617         )
618         if self.sha1_key:
619             self.verify_sha1_auth(packet)
620
621
622 def bfd_session_up(test):
623     """Bring BFD session up"""
624     test.logger.info("BFD: Waiting for slow hello")
625     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
626     old_offset = None
627     if hasattr(test, "vpp_clock_offset"):
628         old_offset = test.vpp_clock_offset
629     test.vpp_clock_offset = time.time() - float(p.time)
630     test.logger.debug("BFD: Calculated vpp clock offset: %s", test.vpp_clock_offset)
631     if old_offset:
632         test.assertAlmostEqual(
633             old_offset,
634             test.vpp_clock_offset,
635             delta=0.5,
636             msg="vpp clock offset not stable (new: %s, old: %s)"
637             % (test.vpp_clock_offset, old_offset),
638         )
639     test.logger.info("BFD: Sending Init")
640     test.test_session.update(
641         my_discriminator=randint(0, 40000000),
642         your_discriminator=p[BFD].my_discriminator,
643         state=BFDState.init,
644     )
645     if (
646         test.test_session.sha1_key
647         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
648     ):
649         test.test_session.inc_seq_num()
650     test.test_session.send_packet()
651     test.logger.info("BFD: Waiting for event")
652     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
653     verify_event(test, e, expected_state=BFDState.up)
654     test.logger.info("BFD: Session is Up")
655     test.test_session.update(state=BFDState.up)
656     if (
657         test.test_session.sha1_key
658         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
659     ):
660         test.test_session.inc_seq_num()
661     test.test_session.send_packet()
662     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
663
664
665 def bfd_session_down(test):
666     """Bring BFD session down"""
667     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
668     test.test_session.update(state=BFDState.down)
669     if (
670         test.test_session.sha1_key
671         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
672     ):
673         test.test_session.inc_seq_num()
674     test.test_session.send_packet()
675     test.logger.info("BFD: Waiting for event")
676     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
677     verify_event(test, e, expected_state=BFDState.down)
678     test.logger.info("BFD: Session is Down")
679     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
680
681
682 def verify_bfd_session_config(test, session, state=None):
683     dump = session.get_bfd_udp_session_dump_entry()
684     test.assertIsNotNone(dump)
685     # since dump is not none, we have verified that sw_if_index and addresses
686     # are valid (in get_bfd_udp_session_dump_entry)
687     if state:
688         test.assert_equal(dump.state, state, "session state")
689     test.assert_equal(
690         dump.required_min_rx, session.required_min_rx, "required min rx interval"
691     )
692     test.assert_equal(
693         dump.desired_min_tx, session.desired_min_tx, "desired min tx interval"
694     )
695     test.assert_equal(dump.detect_mult, session.detect_mult, "detect multiplier")
696     if session.sha1_key is None:
697         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
698     else:
699         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
700         test.assert_equal(dump.bfd_key_id, session.bfd_key_id, "bfd key id")
701         test.assert_equal(
702             dump.conf_key_id, session.sha1_key.conf_key_id, "config key id"
703         )
704
705
706 def verify_ip(test, packet):
707     """Verify correctness of IP layer."""
708     if test.vpp_session.af == AF_INET6:
709         ip = packet[IPv6]
710         local_ip = test.vpp_session.interface.local_ip6
711         remote_ip = test.vpp_session.interface.remote_ip6
712         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
713     else:
714         ip = packet[IP]
715         local_ip = test.vpp_session.interface.local_ip4
716         remote_ip = test.vpp_session.interface.remote_ip4
717         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
718     test.assert_equal(ip.src, local_ip, "IP source address")
719     test.assert_equal(ip.dst, remote_ip, "IP destination address")
720
721
722 def verify_udp(test, packet):
723     """Verify correctness of UDP layer."""
724     udp = packet[UDP]
725     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
726     test.assert_in_range(
727         udp.sport, BFD.udp_sport_min, BFD.udp_sport_max, "UDP source port"
728     )
729
730
731 def verify_event(test, event, expected_state):
732     """Verify correctness of event values."""
733     e = event
734     test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
735     test.assert_equal(
736         e.sw_if_index, test.vpp_session.interface.sw_if_index, "BFD interface index"
737     )
738
739     test.assert_equal(
740         str(e.local_addr), test.vpp_session.local_addr, "Local IPv6 address"
741     )
742     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr, "Peer IPv6 address")
743     test.assert_equal(e.state, expected_state, BFDState)
744
745
746 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
747     """wait for BFD packet and verify its correctness
748
749     :param timeout: how long to wait
750     :param pcap_time_min: ignore packets with pcap timestamp lower than this
751
752     :returns: tuple (packet, time spent waiting for packet)
753     """
754     test.logger.info("BFD: Waiting for BFD packet")
755     deadline = time.time() + timeout
756     counter = 0
757     while True:
758         counter += 1
759         # sanity check
760         test.assert_in_range(counter, 0, 100, "number of packets ignored")
761         time_left = deadline - time.time()
762         if time_left < 0:
763             raise CaptureTimeoutError("Packet did not arrive within timeout")
764         p = test.pg0.wait_for_packet(timeout=time_left)
765         test.test_session.rx_packets += 1
766         test.logger.debug(ppp("BFD: Got packet:", p))
767         if pcap_time_min is not None and p.time < pcap_time_min:
768             test.logger.debug(
769                 ppp(
770                     "BFD: ignoring packet (pcap time %s < "
771                     "pcap time min %s):" % (p.time, pcap_time_min),
772                     p,
773                 )
774             )
775         else:
776             break
777     if is_tunnel:
778         # strip an IP layer and move to the next
779         p = p[IP].payload
780
781     bfd = p[BFD]
782     if bfd is None:
783         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
784     if bfd.payload:
785         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
786     verify_ip(test, p)
787     verify_udp(test, p)
788     test.test_session.verify_bfd(p)
789     return p
790
791
792 BFDStats = namedtuple("BFDStats", "rx rx_echo tx tx_echo")
793
794
795 def bfd_grab_stats_snapshot(test, bs_idx=0, thread_index=None):
796     s = test.statistics
797     ti = thread_index
798     if ti is None:
799         rx = s["/bfd/rx-session-counters"][:, bs_idx].sum_packets()
800         rx_echo = s["/bfd/rx-session-echo-counters"][:, bs_idx].sum_packets()
801         tx = s["/bfd/tx-session-counters"][:, bs_idx].sum_packets()
802         tx_echo = s["/bfd/tx-session-echo-counters"][:, bs_idx].sum_packets()
803     else:
804         rx = s["/bfd/rx-session-counters"][ti, bs_idx].sum_packets()
805         rx_echo = s["/bfd/rx-session-echo-counters"][ti, bs_idx].sum_packets()
806         tx = s["/bfd/tx-session-counters"][ti, bs_idx].sum_packets()
807         tx_echo = s["/bfd/tx-session-echo-counters"][ti, bs_idx].sum_packets()
808     return BFDStats(rx, rx_echo, tx, tx_echo)
809
810
811 def bfd_stats_diff(stats_before, stats_after):
812     rx = stats_after.rx - stats_before.rx
813     rx_echo = stats_after.rx_echo - stats_before.rx_echo
814     tx = stats_after.tx - stats_before.tx
815     tx_echo = stats_after.tx_echo - stats_before.tx_echo
816     return BFDStats(rx, rx_echo, tx, tx_echo)
817
818
819 @tag_run_solo
820 class BFD4TestCase(VppTestCase):
821     """Bidirectional Forwarding Detection (BFD)"""
822
823     pg0 = None
824     vpp_clock_offset = None
825     vpp_session = None
826     test_session = None
827
828     @classmethod
829     def setUpClass(cls):
830         super(BFD4TestCase, cls).setUpClass()
831         cls.vapi.cli("set log class bfd level debug")
832         try:
833             cls.create_pg_interfaces([0])
834             cls.create_loopback_interfaces(1)
835             cls.loopback0 = cls.lo_interfaces[0]
836             cls.loopback0.config_ip4()
837             cls.loopback0.admin_up()
838             cls.pg0.config_ip4()
839             cls.pg0.configure_ipv4_neighbors()
840             cls.pg0.admin_up()
841             cls.pg0.resolve_arp()
842
843         except Exception:
844             super(BFD4TestCase, cls).tearDownClass()
845             raise
846
847     @classmethod
848     def tearDownClass(cls):
849         super(BFD4TestCase, cls).tearDownClass()
850
851     def setUp(self):
852         super(BFD4TestCase, self).setUp()
853         self.factory = AuthKeyFactory()
854         self.vapi.want_bfd_events()
855         self.pg0.enable_capture()
856         try:
857             self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
858             self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
859             self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
860             self.vpp_session.add_vpp_config()
861             self.vpp_session.admin_up()
862             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
863         except BaseException:
864             self.vapi.want_bfd_events(enable_disable=0)
865             raise
866
867     def tearDown(self):
868         if not self.vpp_dead:
869             self.vapi.want_bfd_events(enable_disable=0)
870         self.vapi.collect_events()  # clear the event queue
871         super(BFD4TestCase, self).tearDown()
872
873     def test_session_up(self):
874         """bring BFD session up"""
875         bfd_session_up(self)
876         bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
877         bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
878         self.assert_equal(bfd_udp4_sessions - self.bfd_udp4_sessions, 1)
879         self.assert_equal(bfd_udp6_sessions, self.bfd_udp6_sessions)
880
881     def test_session_up_by_ip(self):
882         """bring BFD session up - first frame looked up by address pair"""
883         self.logger.info("BFD: Sending Slow control frame")
884         self.test_session.update(my_discriminator=randint(0, 40000000))
885         self.test_session.send_packet()
886         self.pg0.enable_capture()
887         p = self.pg0.wait_for_packet(1)
888         self.assert_equal(
889             p[BFD].your_discriminator,
890             self.test_session.my_discriminator,
891             "BFD - your discriminator",
892         )
893         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
894         self.test_session.update(
895             your_discriminator=p[BFD].my_discriminator, state=BFDState.up
896         )
897         self.logger.info("BFD: Waiting for event")
898         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
899         verify_event(self, e, expected_state=BFDState.init)
900         self.logger.info("BFD: Sending Up")
901         self.test_session.send_packet()
902         self.logger.info("BFD: Waiting for event")
903         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
904         verify_event(self, e, expected_state=BFDState.up)
905         self.logger.info("BFD: Session is Up")
906         self.test_session.update(state=BFDState.up)
907         self.test_session.send_packet()
908         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
909
910     def test_session_down(self):
911         """bring BFD session down"""
912         bfd_session_up(self)
913         bfd_session_down(self)
914
915     def test_hold_up(self):
916         """hold BFD session up"""
917         bfd_session_up(self)
918         for dummy in range(self.test_session.detect_mult * 2):
919             wait_for_bfd_packet(self)
920             self.test_session.send_packet()
921         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
922
923     def test_slow_timer(self):
924         """verify slow periodic control frames while session down"""
925         packet_count = 3
926         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
927         prev_packet = wait_for_bfd_packet(self, 2)
928         for dummy in range(packet_count):
929             next_packet = wait_for_bfd_packet(self, 2)
930             time_diff = next_packet.time - prev_packet.time
931             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
932             # to work around timing issues
933             self.assert_in_range(time_diff, 0.70, 1.05, "time between slow packets")
934             prev_packet = next_packet
935
936     def test_zero_remote_min_rx(self):
937         """no packets when zero remote required min rx interval"""
938         bfd_session_up(self)
939         self.test_session.update(required_min_rx=0)
940         self.test_session.send_packet()
941         for dummy in range(self.test_session.detect_mult):
942             self.sleep(
943                 self.vpp_session.required_min_rx / USEC_IN_SEC,
944                 "sleep before transmitting bfd packet",
945             )
946             self.test_session.send_packet()
947             try:
948                 p = wait_for_bfd_packet(self, timeout=0)
949                 self.logger.error(ppp("Received unexpected packet:", p))
950             except CaptureTimeoutError:
951                 pass
952         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
953         self.test_session.update(required_min_rx=300000)
954         for dummy in range(3):
955             self.test_session.send_packet()
956             wait_for_bfd_packet(
957                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC
958             )
959         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
960
961     def test_conn_down(self):
962         """verify session goes down after inactivity"""
963         bfd_session_up(self)
964         detection_time = (
965             self.test_session.detect_mult
966             * self.vpp_session.required_min_rx
967             / USEC_IN_SEC
968         )
969         self.sleep(detection_time, "waiting for BFD session time-out")
970         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
971         verify_event(self, e, expected_state=BFDState.down)
972
973     def test_peer_discr_reset_sess_down(self):
974         """peer discriminator reset after session goes down"""
975         bfd_session_up(self)
976         detection_time = (
977             self.test_session.detect_mult
978             * self.vpp_session.required_min_rx
979             / USEC_IN_SEC
980         )
981         self.sleep(detection_time, "waiting for BFD session time-out")
982         self.test_session.my_discriminator = 0
983         wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
984
985     def test_large_required_min_rx(self):
986         """large remote required min rx interval"""
987         bfd_session_up(self)
988         p = wait_for_bfd_packet(self)
989         interval = 3000000
990         self.test_session.update(required_min_rx=interval)
991         self.test_session.send_packet()
992         time_mark = time.time()
993         count = 0
994         # busy wait here, trying to collect a packet or event, vpp is not
995         # allowed to send packets and the session will timeout first - so the
996         # Up->Down event must arrive before any packets do
997         while time.time() < time_mark + interval / USEC_IN_SEC:
998             try:
999                 p = wait_for_bfd_packet(self, timeout=0)
1000                 # if vpp managed to send a packet before we did the session
1001                 # session update, then that's fine, ignore it
1002                 if p.time < time_mark - self.vpp_clock_offset:
1003                     continue
1004                 self.logger.error(ppp("Received unexpected packet:", p))
1005                 count += 1
1006             except CaptureTimeoutError:
1007                 pass
1008             events = self.vapi.collect_events()
1009             if len(events) > 0:
1010                 verify_event(self, events[0], BFDState.down)
1011                 break
1012         self.assert_equal(count, 0, "number of packets received")
1013
1014     def test_immediate_remote_min_rx_reduction(self):
1015         """immediately honor remote required min rx reduction"""
1016         self.vpp_session.remove_vpp_config()
1017         self.vpp_session = VppBFDUDPSession(
1018             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000
1019         )
1020         self.pg0.enable_capture()
1021         self.vpp_session.add_vpp_config()
1022         self.test_session.update(desired_min_tx=1000000, required_min_rx=1000000)
1023         bfd_session_up(self)
1024         reference_packet = wait_for_bfd_packet(self)
1025         time_mark = time.time()
1026         interval = 300000
1027         self.test_session.update(required_min_rx=interval)
1028         self.test_session.send_packet()
1029         extra_time = time.time() - time_mark
1030         p = wait_for_bfd_packet(self)
1031         # first packet is allowed to be late by time we spent doing the update
1032         # calculated in extra_time
1033         self.assert_in_range(
1034             p.time - reference_packet.time,
1035             0.95 * 0.75 * interval / USEC_IN_SEC,
1036             1.05 * interval / USEC_IN_SEC + extra_time,
1037             "time between BFD packets",
1038         )
1039         reference_packet = p
1040         for dummy in range(3):
1041             p = wait_for_bfd_packet(self)
1042             diff = p.time - reference_packet.time
1043             self.assert_in_range(
1044                 diff,
1045                 0.95 * 0.75 * interval / USEC_IN_SEC,
1046                 1.05 * interval / USEC_IN_SEC,
1047                 "time between BFD packets",
1048             )
1049             reference_packet = p
1050
1051     def test_modify_req_min_rx_double(self):
1052         """modify session - double required min rx"""
1053         bfd_session_up(self)
1054         p = wait_for_bfd_packet(self)
1055         self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1056         self.test_session.send_packet()
1057         # double required min rx
1058         self.vpp_session.modify_parameters(
1059             required_min_rx=2 * self.vpp_session.required_min_rx
1060         )
1061         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1062         # poll bit needs to be set
1063         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1064         # finish poll sequence with final packet
1065         final = self.test_session.create_packet()
1066         final[BFD].flags = "F"
1067         timeout = (
1068             self.test_session.detect_mult
1069             * max(self.test_session.desired_min_tx, self.vpp_session.required_min_rx)
1070             / USEC_IN_SEC
1071         )
1072         self.test_session.send_packet(final)
1073         time_mark = time.time()
1074         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
1075         verify_event(self, e, expected_state=BFDState.down)
1076         time_to_event = time.time() - time_mark
1077         self.assert_in_range(
1078             time_to_event, 0.9 * timeout, 1.1 * timeout, "session timeout"
1079         )
1080
1081     def test_modify_req_min_rx_halve(self):
1082         """modify session - halve required min rx"""
1083         self.vpp_session.modify_parameters(
1084             required_min_rx=2 * self.vpp_session.required_min_rx
1085         )
1086         bfd_session_up(self)
1087         p = wait_for_bfd_packet(self)
1088         self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1089         self.test_session.send_packet()
1090         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1091         # halve required min rx
1092         old_required_min_rx = self.vpp_session.required_min_rx
1093         self.vpp_session.modify_parameters(
1094             required_min_rx=self.vpp_session.required_min_rx // 2
1095         )
1096         # now we wait 0.8*3*old-req-min-rx and the session should still be up
1097         self.sleep(
1098             0.8 * self.vpp_session.detect_mult * old_required_min_rx / USEC_IN_SEC,
1099             "wait before finishing poll sequence",
1100         )
1101         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1102         p = wait_for_bfd_packet(self)
1103         # poll bit needs to be set
1104         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1105         # finish poll sequence with final packet
1106         final = self.test_session.create_packet()
1107         final[BFD].flags = "F"
1108         self.test_session.send_packet(final)
1109         # now the session should time out under new conditions
1110         detection_time = (
1111             self.test_session.detect_mult
1112             * self.vpp_session.required_min_rx
1113             / USEC_IN_SEC
1114         )
1115         before = time.time()
1116         e = self.vapi.wait_for_event(2 * detection_time, "bfd_udp_session_event")
1117         after = time.time()
1118         self.assert_in_range(
1119             after - before,
1120             0.9 * detection_time,
1121             1.1 * detection_time,
1122             "time before bfd session goes down",
1123         )
1124         verify_event(self, e, expected_state=BFDState.down)
1125
1126     def test_modify_detect_mult(self):
1127         """modify detect multiplier"""
1128         bfd_session_up(self)
1129         p = wait_for_bfd_packet(self)
1130         self.vpp_session.modify_parameters(detect_mult=1)
1131         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1132         self.assert_equal(
1133             self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1134         )
1135         # poll bit must not be set
1136         self.assertNotIn(
1137             "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1138         )
1139         self.vpp_session.modify_parameters(detect_mult=10)
1140         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1141         self.assert_equal(
1142             self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1143         )
1144         # poll bit must not be set
1145         self.assertNotIn(
1146             "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1147         )
1148
1149     def test_queued_poll(self):
1150         """test poll sequence queueing"""
1151         bfd_session_up(self)
1152         p = wait_for_bfd_packet(self)
1153         self.vpp_session.modify_parameters(
1154             required_min_rx=2 * self.vpp_session.required_min_rx
1155         )
1156         p = wait_for_bfd_packet(self)
1157         poll_sequence_start = time.time()
1158         poll_sequence_length_min = 0.5
1159         send_final_after = time.time() + poll_sequence_length_min
1160         # poll bit needs to be set
1161         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1162         self.assert_equal(
1163             p[BFD].required_min_rx_interval,
1164             self.vpp_session.required_min_rx,
1165             "BFD required min rx interval",
1166         )
1167         self.vpp_session.modify_parameters(
1168             required_min_rx=2 * self.vpp_session.required_min_rx
1169         )
1170         # 2nd poll sequence should be queued now
1171         # don't send the reply back yet, wait for some time to emulate
1172         # longer round-trip time
1173         packet_count = 0
1174         while time.time() < send_final_after:
1175             self.test_session.send_packet()
1176             p = wait_for_bfd_packet(self)
1177             self.assert_equal(
1178                 len(self.vapi.collect_events()), 0, "number of bfd events"
1179             )
1180             self.assert_equal(
1181                 p[BFD].required_min_rx_interval,
1182                 self.vpp_session.required_min_rx,
1183                 "BFD required min rx interval",
1184             )
1185             packet_count += 1
1186             # poll bit must be set
1187             self.assertIn(
1188                 "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1189             )
1190         final = self.test_session.create_packet()
1191         final[BFD].flags = "F"
1192         self.test_session.send_packet(final)
1193         # finish 1st with final
1194         poll_sequence_length = time.time() - poll_sequence_start
1195         # vpp must wait for some time before starting new poll sequence
1196         poll_no_2_started = False
1197         for dummy in range(2 * packet_count):
1198             p = wait_for_bfd_packet(self)
1199             self.assert_equal(
1200                 len(self.vapi.collect_events()), 0, "number of bfd events"
1201             )
1202             if "P" in p.sprintf("%BFD.flags%"):
1203                 poll_no_2_started = True
1204                 if time.time() < poll_sequence_start + poll_sequence_length:
1205                     raise Exception("VPP started 2nd poll sequence too soon")
1206                 final = self.test_session.create_packet()
1207                 final[BFD].flags = "F"
1208                 self.test_session.send_packet(final)
1209                 break
1210             else:
1211                 self.test_session.send_packet()
1212         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1213         # finish 2nd with final
1214         final = self.test_session.create_packet()
1215         final[BFD].flags = "F"
1216         self.test_session.send_packet(final)
1217         p = wait_for_bfd_packet(self)
1218         # poll bit must not be set
1219         self.assertNotIn("P", p.sprintf("%BFD.flags%"), "Poll bit set in BFD packet")
1220
1221     # returning inconsistent results requiring retries in per-patch tests
1222     @unittest.skipUnless(config.extended, "part of extended tests")
1223     def test_poll_response(self):
1224         """test correct response to control frame with poll bit set"""
1225         bfd_session_up(self)
1226         poll = self.test_session.create_packet()
1227         poll[BFD].flags = "P"
1228         self.test_session.send_packet(poll)
1229         final = wait_for_bfd_packet(
1230             self, pcap_time_min=time.time() - self.vpp_clock_offset
1231         )
1232         self.assertIn("F", final.sprintf("%BFD.flags%"))
1233
1234     def test_no_periodic_if_remote_demand(self):
1235         """no periodic frames outside poll sequence if remote demand set"""
1236         bfd_session_up(self)
1237         demand = self.test_session.create_packet()
1238         demand[BFD].flags = "D"
1239         self.test_session.send_packet(demand)
1240         transmit_time = (
1241             0.9
1242             * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1243             / USEC_IN_SEC
1244         )
1245         count = 0
1246         for dummy in range(self.test_session.detect_mult * 2):
1247             self.sleep(transmit_time)
1248             self.test_session.send_packet(demand)
1249             try:
1250                 p = wait_for_bfd_packet(self, timeout=0)
1251                 self.logger.error(ppp("Received unexpected packet:", p))
1252                 count += 1
1253             except CaptureTimeoutError:
1254                 pass
1255         events = self.vapi.collect_events()
1256         for e in events:
1257             self.logger.error("Received unexpected event: %s", e)
1258         self.assert_equal(count, 0, "number of packets received")
1259         self.assert_equal(len(events), 0, "number of events received")
1260
1261     def test_echo_looped_back(self):
1262         """echo packets looped back"""
1263         bfd_session_up(self)
1264         stats_before = bfd_grab_stats_snapshot(self)
1265         self.pg0.enable_capture()
1266         echo_packet_count = 10
1267         # random source port low enough to increment a few times..
1268         udp_sport_tx = randint(1, 50000)
1269         udp_sport_rx = udp_sport_tx
1270         echo_packet = (
1271             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1272             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
1273             / UDP(dport=BFD.udp_dport_echo)
1274             / Raw("this should be looped back")
1275         )
1276         for dummy in range(echo_packet_count):
1277             self.sleep(0.01, "delay between echo packets")
1278             echo_packet[UDP].sport = udp_sport_tx
1279             udp_sport_tx += 1
1280             self.logger.debug(ppp("Sending packet:", echo_packet))
1281             self.pg0.add_stream(echo_packet)
1282             self.pg_start()
1283             self.logger.debug(self.vapi.ppcli("show trace"))
1284         counter = 0
1285         bfd_control_packets_rx = 0
1286         while counter < echo_packet_count:
1287             p = self.pg0.wait_for_packet(1)
1288             self.logger.debug(ppp("Got packet:", p))
1289             ether = p[Ether]
1290             self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1291             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1292             ip = p[IP]
1293             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1294             udp = p[UDP]
1295             if udp.dport == BFD.udp_dport:
1296                 bfd_control_packets_rx += 1
1297                 continue
1298             self.assert_equal(self.pg0.remote_ip4, ip.src, "Source IP")
1299             self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1300             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1301             udp_sport_rx += 1
1302             # need to compare the hex payload here, otherwise BFD_vpp_echo
1303             # gets in way
1304             self.assertEqual(
1305                 scapy.compat.raw(p[UDP].payload),
1306                 scapy.compat.raw(echo_packet[UDP].payload),
1307                 "Received packet is not the echo packet sent",
1308             )
1309             counter += 1
1310         self.assert_equal(
1311             udp_sport_tx,
1312             udp_sport_rx,
1313             "UDP source port (== ECHO packet identifier for test purposes)",
1314         )
1315         stats_after = bfd_grab_stats_snapshot(self)
1316         diff = bfd_stats_diff(stats_before, stats_after)
1317         self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1318         self.assertEqual(bfd_control_packets_rx, diff.tx, "TX counter incorrect")
1319         self.assertEqual(
1320             0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1321         )
1322         self.assertEqual(
1323             0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1324         )
1325
1326     def test_echo(self):
1327         """echo function"""
1328         stats_before = bfd_grab_stats_snapshot(self)
1329         bfd_session_up(self)
1330         self.test_session.update(required_min_echo_rx=150000)
1331         self.test_session.send_packet()
1332         detection_time = (
1333             self.test_session.detect_mult
1334             * self.vpp_session.required_min_rx
1335             / USEC_IN_SEC
1336         )
1337         # echo shouldn't work without echo source set
1338         for dummy in range(10):
1339             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1340             self.sleep(sleep, "delay before sending bfd packet")
1341             self.test_session.send_packet()
1342         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1343         self.assert_equal(
1344             p[BFD].required_min_rx_interval,
1345             self.vpp_session.required_min_rx,
1346             "BFD required min rx interval",
1347         )
1348         self.test_session.send_packet()
1349         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1350         echo_seen = False
1351         # should be turned on - loopback echo packets
1352         for dummy in range(3):
1353             loop_until = time.time() + 0.75 * detection_time
1354             while time.time() < loop_until:
1355                 p = self.pg0.wait_for_packet(1)
1356                 self.logger.debug(ppp("Got packet:", p))
1357                 if p[UDP].dport == BFD.udp_dport_echo:
1358                     self.assert_equal(p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1359                     self.assertNotEqual(
1360                         p[IP].src,
1361                         self.loopback0.local_ip4,
1362                         "BFD ECHO src IP equal to loopback IP",
1363                     )
1364                     self.logger.debug(ppp("Looping back packet:", p))
1365                     self.assert_equal(
1366                         p[Ether].dst,
1367                         self.pg0.remote_mac,
1368                         "ECHO packet destination MAC address",
1369                     )
1370                     p[Ether].dst = self.pg0.local_mac
1371                     self.pg0.add_stream(p)
1372                     self.test_session.rx_packets_echo += 1
1373                     self.test_session.tx_packets_echo += 1
1374                     self.pg_start()
1375                     echo_seen = True
1376                 elif p.haslayer(BFD):
1377                     self.test_session.rx_packets += 1
1378                     if echo_seen:
1379                         self.assertGreaterEqual(
1380                             p[BFD].required_min_rx_interval, 1000000
1381                         )
1382                     if "P" in p.sprintf("%BFD.flags%"):
1383                         final = self.test_session.create_packet()
1384                         final[BFD].flags = "F"
1385                         self.test_session.send_packet(final)
1386                 else:
1387                     raise Exception(ppp("Received unknown packet:", p))
1388
1389                 self.assert_equal(
1390                     len(self.vapi.collect_events()), 0, "number of bfd events"
1391                 )
1392             self.test_session.send_packet()
1393         self.assertTrue(echo_seen, "No echo packets received")
1394
1395         stats_after = bfd_grab_stats_snapshot(self)
1396         diff = bfd_stats_diff(stats_before, stats_after)
1397         # our rx is vpp tx and vice versa, also tolerate one packet off
1398         self.assert_in_range(
1399             self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1400         )
1401         self.assert_in_range(
1402             self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1403         )
1404         self.assert_in_range(
1405             self.test_session.tx_packets_echo,
1406             diff.rx_echo - 1,
1407             diff.rx_echo + 1,
1408             "RX echo counter",
1409         )
1410         self.assert_in_range(
1411             self.test_session.rx_packets_echo,
1412             diff.tx_echo - 1,
1413             diff.tx_echo + 1,
1414             "TX echo counter",
1415         )
1416
1417     def test_echo_fail(self):
1418         """session goes down if echo function fails"""
1419         bfd_session_up(self)
1420         self.test_session.update(required_min_echo_rx=150000)
1421         self.test_session.send_packet()
1422         detection_time = (
1423             self.test_session.detect_mult
1424             * self.vpp_session.required_min_rx
1425             / USEC_IN_SEC
1426         )
1427         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1428         # echo function should be used now, but we will drop the echo packets
1429         verified_diag = False
1430         for dummy in range(3):
1431             loop_until = time.time() + 0.75 * detection_time
1432             while time.time() < loop_until:
1433                 p = self.pg0.wait_for_packet(1)
1434                 self.logger.debug(ppp("Got packet:", p))
1435                 if p[UDP].dport == BFD.udp_dport_echo:
1436                     # dropped
1437                     pass
1438                 elif p.haslayer(BFD):
1439                     if "P" in p.sprintf("%BFD.flags%"):
1440                         self.assertGreaterEqual(
1441                             p[BFD].required_min_rx_interval, 1000000
1442                         )
1443                         final = self.test_session.create_packet()
1444                         final[BFD].flags = "F"
1445                         self.test_session.send_packet(final)
1446                     if p[BFD].state == BFDState.down:
1447                         self.assert_equal(
1448                             p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1449                         )
1450                         verified_diag = True
1451                 else:
1452                     raise Exception(ppp("Received unknown packet:", p))
1453             self.test_session.send_packet()
1454         events = self.vapi.collect_events()
1455         self.assert_equal(len(events), 1, "number of bfd events")
1456         self.assert_equal(events[0].state, BFDState.down, BFDState)
1457         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1458
1459     def test_echo_stop(self):
1460         """echo function stops if peer sets required min echo rx zero"""
1461         bfd_session_up(self)
1462         self.test_session.update(required_min_echo_rx=150000)
1463         self.test_session.send_packet()
1464         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1465         # wait for first echo packet
1466         while True:
1467             p = self.pg0.wait_for_packet(1)
1468             self.logger.debug(ppp("Got packet:", p))
1469             if p[UDP].dport == BFD.udp_dport_echo:
1470                 self.logger.debug(ppp("Looping back packet:", p))
1471                 p[Ether].dst = self.pg0.local_mac
1472                 self.pg0.add_stream(p)
1473                 self.pg_start()
1474                 break
1475             elif p.haslayer(BFD):
1476                 # ignore BFD
1477                 pass
1478             else:
1479                 raise Exception(ppp("Received unknown packet:", p))
1480         self.test_session.update(required_min_echo_rx=0)
1481         self.test_session.send_packet()
1482         # echo packets shouldn't arrive anymore
1483         for dummy in range(5):
1484             wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1485             self.test_session.send_packet()
1486             events = self.vapi.collect_events()
1487             self.assert_equal(len(events), 0, "number of bfd events")
1488
1489     def test_echo_source_removed(self):
1490         """echo function stops if echo source is removed"""
1491         bfd_session_up(self)
1492         self.test_session.update(required_min_echo_rx=150000)
1493         self.test_session.send_packet()
1494         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1495         # wait for first echo packet
1496         while True:
1497             p = self.pg0.wait_for_packet(1)
1498             self.logger.debug(ppp("Got packet:", p))
1499             if p[UDP].dport == BFD.udp_dport_echo:
1500                 self.logger.debug(ppp("Looping back packet:", p))
1501                 p[Ether].dst = self.pg0.local_mac
1502                 self.pg0.add_stream(p)
1503                 self.pg_start()
1504                 break
1505             elif p.haslayer(BFD):
1506                 # ignore BFD
1507                 pass
1508             else:
1509                 raise Exception(ppp("Received unknown packet:", p))
1510         self.vapi.bfd_udp_del_echo_source()
1511         self.test_session.send_packet()
1512         # echo packets shouldn't arrive anymore
1513         for dummy in range(5):
1514             wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1515             self.test_session.send_packet()
1516             events = self.vapi.collect_events()
1517             self.assert_equal(len(events), 0, "number of bfd events")
1518
1519     def test_stale_echo(self):
1520         """stale echo packets don't keep a session up"""
1521         bfd_session_up(self)
1522         self.test_session.update(required_min_echo_rx=150000)
1523         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1524         self.test_session.send_packet()
1525         # should be turned on - loopback echo packets
1526         echo_packet = None
1527         timeout_at = None
1528         timeout_ok = False
1529         for dummy in range(10 * self.vpp_session.detect_mult):
1530             p = self.pg0.wait_for_packet(1)
1531             if p[UDP].dport == BFD.udp_dport_echo:
1532                 if echo_packet is None:
1533                     self.logger.debug(ppp("Got first echo packet:", p))
1534                     echo_packet = p
1535                     timeout_at = (
1536                         time.time()
1537                         + self.vpp_session.detect_mult
1538                         * self.test_session.required_min_echo_rx
1539                         / USEC_IN_SEC
1540                     )
1541                 else:
1542                     self.logger.debug(ppp("Got followup echo packet:", p))
1543                 self.logger.debug(ppp("Looping back first echo packet:", p))
1544                 echo_packet[Ether].dst = self.pg0.local_mac
1545                 self.pg0.add_stream(echo_packet)
1546                 self.pg_start()
1547             elif p.haslayer(BFD):
1548                 self.logger.debug(ppp("Got packet:", p))
1549                 if "P" in p.sprintf("%BFD.flags%"):
1550                     final = self.test_session.create_packet()
1551                     final[BFD].flags = "F"
1552                     self.test_session.send_packet(final)
1553                 if p[BFD].state == BFDState.down:
1554                     self.assertIsNotNone(
1555                         timeout_at,
1556                         "Session went down before first echo packet received",
1557                     )
1558                     now = time.time()
1559                     self.assertGreaterEqual(
1560                         now,
1561                         timeout_at,
1562                         "Session timeout at %s, but is expected at %s"
1563                         % (now, timeout_at),
1564                     )
1565                     self.assert_equal(
1566                         p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1567                     )
1568                     events = self.vapi.collect_events()
1569                     self.assert_equal(len(events), 1, "number of bfd events")
1570                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1571                     timeout_ok = True
1572                     break
1573             else:
1574                 raise Exception(ppp("Received unknown packet:", p))
1575             self.test_session.send_packet()
1576         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1577
1578     def test_invalid_echo_checksum(self):
1579         """echo packets with invalid checksum don't keep a session up"""
1580         bfd_session_up(self)
1581         self.test_session.update(required_min_echo_rx=150000)
1582         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1583         self.test_session.send_packet()
1584         # should be turned on - loopback echo packets
1585         timeout_at = None
1586         timeout_ok = False
1587         for dummy in range(10 * self.vpp_session.detect_mult):
1588             p = self.pg0.wait_for_packet(1)
1589             if p[UDP].dport == BFD.udp_dport_echo:
1590                 self.logger.debug(ppp("Got echo packet:", p))
1591                 if timeout_at is None:
1592                     timeout_at = (
1593                         time.time()
1594                         + self.vpp_session.detect_mult
1595                         * self.test_session.required_min_echo_rx
1596                         / USEC_IN_SEC
1597                     )
1598                 p[BFD_vpp_echo].checksum = getrandbits(64)
1599                 p[Ether].dst = self.pg0.local_mac
1600                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1601                 self.pg0.add_stream(p)
1602                 self.pg_start()
1603             elif p.haslayer(BFD):
1604                 self.logger.debug(ppp("Got packet:", p))
1605                 if "P" in p.sprintf("%BFD.flags%"):
1606                     final = self.test_session.create_packet()
1607                     final[BFD].flags = "F"
1608                     self.test_session.send_packet(final)
1609                 if p[BFD].state == BFDState.down:
1610                     self.assertIsNotNone(
1611                         timeout_at,
1612                         "Session went down before first echo packet received",
1613                     )
1614                     now = time.time()
1615                     self.assertGreaterEqual(
1616                         now,
1617                         timeout_at,
1618                         "Session timeout at %s, but is expected at %s"
1619                         % (now, timeout_at),
1620                     )
1621                     self.assert_equal(
1622                         p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1623                     )
1624                     events = self.vapi.collect_events()
1625                     self.assert_equal(len(events), 1, "number of bfd events")
1626                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1627                     timeout_ok = True
1628                     break
1629             else:
1630                 raise Exception(ppp("Received unknown packet:", p))
1631             self.test_session.send_packet()
1632         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1633
1634     def test_admin_up_down(self):
1635         """put session admin-up and admin-down"""
1636         bfd_session_up(self)
1637         self.vpp_session.admin_down()
1638         self.pg0.enable_capture()
1639         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1640         verify_event(self, e, expected_state=BFDState.admin_down)
1641         for dummy in range(2):
1642             p = wait_for_bfd_packet(self)
1643             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1644         # try to bring session up - shouldn't be possible
1645         self.test_session.update(state=BFDState.init)
1646         self.test_session.send_packet()
1647         for dummy in range(2):
1648             p = wait_for_bfd_packet(self)
1649             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1650         self.vpp_session.admin_up()
1651         self.test_session.update(state=BFDState.down)
1652         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1653         verify_event(self, e, expected_state=BFDState.down)
1654         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1655         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1656         self.test_session.send_packet()
1657         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1658         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1659         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1660         verify_event(self, e, expected_state=BFDState.init)
1661         self.test_session.update(state=BFDState.up)
1662         self.test_session.send_packet()
1663         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1664         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1665         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1666         verify_event(self, e, expected_state=BFDState.up)
1667
1668     def test_config_change_remote_demand(self):
1669         """configuration change while peer in demand mode"""
1670         bfd_session_up(self)
1671         demand = self.test_session.create_packet()
1672         demand[BFD].flags = "D"
1673         self.test_session.send_packet(demand)
1674         self.vpp_session.modify_parameters(
1675             required_min_rx=2 * self.vpp_session.required_min_rx
1676         )
1677         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1678         # poll bit must be set
1679         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1680         # terminate poll sequence
1681         final = self.test_session.create_packet()
1682         final[BFD].flags = "D+F"
1683         self.test_session.send_packet(final)
1684         # vpp should be quiet now again
1685         transmit_time = (
1686             0.9
1687             * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1688             / USEC_IN_SEC
1689         )
1690         count = 0
1691         for dummy in range(self.test_session.detect_mult * 2):
1692             self.sleep(transmit_time)
1693             self.test_session.send_packet(demand)
1694             try:
1695                 p = wait_for_bfd_packet(self, timeout=0)
1696                 self.logger.error(ppp("Received unexpected packet:", p))
1697                 count += 1
1698             except CaptureTimeoutError:
1699                 pass
1700         events = self.vapi.collect_events()
1701         for e in events:
1702             self.logger.error("Received unexpected event: %s", e)
1703         self.assert_equal(count, 0, "number of packets received")
1704         self.assert_equal(len(events), 0, "number of events received")
1705
1706     def test_intf_deleted(self):
1707         """interface with bfd session deleted"""
1708         intf = VppLoInterface(self)
1709         intf.config_ip4()
1710         intf.admin_up()
1711         sw_if_index = intf.sw_if_index
1712         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1713         vpp_session.add_vpp_config()
1714         vpp_session.admin_up()
1715         intf.remove_vpp_config()
1716         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1717         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1718         self.assertFalse(vpp_session.query_vpp_config())
1719
1720
1721 @tag_run_solo
1722 @tag_fixme_vpp_workers
1723 class BFD6TestCase(VppTestCase):
1724     """Bidirectional Forwarding Detection (BFD) (IPv6)"""
1725
1726     pg0 = None
1727     vpp_clock_offset = None
1728     vpp_session = None
1729     test_session = None
1730
1731     @classmethod
1732     def setUpClass(cls):
1733         super(BFD6TestCase, cls).setUpClass()
1734         cls.vapi.cli("set log class bfd level debug")
1735         try:
1736             cls.create_pg_interfaces([0])
1737             cls.pg0.config_ip6()
1738             cls.pg0.configure_ipv6_neighbors()
1739             cls.pg0.admin_up()
1740             cls.pg0.resolve_ndp()
1741             cls.create_loopback_interfaces(1)
1742             cls.loopback0 = cls.lo_interfaces[0]
1743             cls.loopback0.config_ip6()
1744             cls.loopback0.admin_up()
1745
1746         except Exception:
1747             super(BFD6TestCase, cls).tearDownClass()
1748             raise
1749
1750     @classmethod
1751     def tearDownClass(cls):
1752         super(BFD6TestCase, cls).tearDownClass()
1753
1754     def setUp(self):
1755         super(BFD6TestCase, self).setUp()
1756         self.factory = AuthKeyFactory()
1757         self.vapi.want_bfd_events()
1758         self.pg0.enable_capture()
1759         try:
1760             self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1761             self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1762             self.vpp_session = VppBFDUDPSession(
1763                 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
1764             )
1765             self.vpp_session.add_vpp_config()
1766             self.vpp_session.admin_up()
1767             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1768             self.logger.debug(self.vapi.cli("show adj nbr"))
1769         except BaseException:
1770             self.vapi.want_bfd_events(enable_disable=0)
1771             raise
1772
1773     def tearDown(self):
1774         if not self.vpp_dead:
1775             self.vapi.want_bfd_events(enable_disable=0)
1776         self.vapi.collect_events()  # clear the event queue
1777         super(BFD6TestCase, self).tearDown()
1778
1779     def test_session_up(self):
1780         """bring BFD session up"""
1781         bfd_session_up(self)
1782         bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1783         bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1784         self.assert_equal(bfd_udp4_sessions, self.bfd_udp4_sessions)
1785         self.assert_equal(bfd_udp6_sessions - self.bfd_udp6_sessions, 1)
1786
1787     def test_session_up_by_ip(self):
1788         """bring BFD session up - first frame looked up by address pair"""
1789         self.logger.info("BFD: Sending Slow control frame")
1790         self.test_session.update(my_discriminator=randint(0, 40000000))
1791         self.test_session.send_packet()
1792         self.pg0.enable_capture()
1793         p = self.pg0.wait_for_packet(1)
1794         self.assert_equal(
1795             p[BFD].your_discriminator,
1796             self.test_session.my_discriminator,
1797             "BFD - your discriminator",
1798         )
1799         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1800         self.test_session.update(
1801             your_discriminator=p[BFD].my_discriminator, state=BFDState.up
1802         )
1803         self.logger.info("BFD: Waiting for event")
1804         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1805         verify_event(self, e, expected_state=BFDState.init)
1806         self.logger.info("BFD: Sending Up")
1807         self.test_session.send_packet()
1808         self.logger.info("BFD: Waiting for event")
1809         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1810         verify_event(self, e, expected_state=BFDState.up)
1811         self.logger.info("BFD: Session is Up")
1812         self.test_session.update(state=BFDState.up)
1813         self.test_session.send_packet()
1814         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1815
1816     def test_hold_up(self):
1817         """hold BFD session up"""
1818         bfd_session_up(self)
1819         for dummy in range(self.test_session.detect_mult * 2):
1820             wait_for_bfd_packet(self)
1821             self.test_session.send_packet()
1822         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1823         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1824
1825     def test_echo_looped_back(self):
1826         """echo packets looped back"""
1827         bfd_session_up(self)
1828         stats_before = bfd_grab_stats_snapshot(self)
1829         self.pg0.enable_capture()
1830         echo_packet_count = 10
1831         # random source port low enough to increment a few times..
1832         udp_sport_tx = randint(1, 50000)
1833         udp_sport_rx = udp_sport_tx
1834         echo_packet = (
1835             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1836             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6)
1837             / UDP(dport=BFD.udp_dport_echo)
1838             / Raw("this should be looped back")
1839         )
1840         for dummy in range(echo_packet_count):
1841             self.sleep(0.01, "delay between echo packets")
1842             echo_packet[UDP].sport = udp_sport_tx
1843             udp_sport_tx += 1
1844             self.logger.debug(ppp("Sending packet:", echo_packet))
1845             self.pg0.add_stream(echo_packet)
1846             self.pg_start()
1847         counter = 0
1848         bfd_control_packets_rx = 0
1849         while counter < echo_packet_count:
1850             p = self.pg0.wait_for_packet(1)
1851             self.logger.debug(ppp("Got packet:", p))
1852             ether = p[Ether]
1853             self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1854             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1855             ip = p[IPv6]
1856             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1857             udp = p[UDP]
1858             if udp.dport == BFD.udp_dport:
1859                 bfd_control_packets_rx += 1
1860                 continue
1861             self.assert_equal(self.pg0.remote_ip6, ip.src, "Source IP")
1862             self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1863             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1864             udp_sport_rx += 1
1865             # need to compare the hex payload here, otherwise BFD_vpp_echo
1866             # gets in way
1867             self.assertEqual(
1868                 scapy.compat.raw(p[UDP].payload),
1869                 scapy.compat.raw(echo_packet[UDP].payload),
1870                 "Received packet is not the echo packet sent",
1871             )
1872             counter += 1
1873         self.assert_equal(
1874             udp_sport_tx,
1875             udp_sport_rx,
1876             "UDP source port (== ECHO packet identifier for test purposes)",
1877         )
1878         stats_after = bfd_grab_stats_snapshot(self)
1879         diff = bfd_stats_diff(stats_before, stats_after)
1880         self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1881         self.assertEqual(bfd_control_packets_rx, diff.tx, "TX counter incorrect")
1882         self.assertEqual(
1883             0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1884         )
1885         self.assertEqual(
1886             0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1887         )
1888
1889     def test_echo(self):
1890         """echo function"""
1891         stats_before = bfd_grab_stats_snapshot(self)
1892         bfd_session_up(self)
1893         self.test_session.update(required_min_echo_rx=150000)
1894         self.test_session.send_packet()
1895         detection_time = (
1896             self.test_session.detect_mult
1897             * self.vpp_session.required_min_rx
1898             / USEC_IN_SEC
1899         )
1900         # echo shouldn't work without echo source set
1901         for dummy in range(10):
1902             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1903             self.sleep(sleep, "delay before sending bfd packet")
1904             self.test_session.send_packet()
1905         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1906         self.assert_equal(
1907             p[BFD].required_min_rx_interval,
1908             self.vpp_session.required_min_rx,
1909             "BFD required min rx interval",
1910         )
1911         self.test_session.send_packet()
1912         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1913         echo_seen = False
1914         # should be turned on - loopback echo packets
1915         for dummy in range(3):
1916             loop_until = time.time() + 0.75 * detection_time
1917             while time.time() < loop_until:
1918                 p = self.pg0.wait_for_packet(1)
1919                 self.logger.debug(ppp("Got packet:", p))
1920                 if p[UDP].dport == BFD.udp_dport_echo:
1921                     self.assert_equal(
1922                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP"
1923                     )
1924                     self.assertNotEqual(
1925                         p[IPv6].src,
1926                         self.loopback0.local_ip6,
1927                         "BFD ECHO src IP equal to loopback IP",
1928                     )
1929                     self.logger.debug(ppp("Looping back packet:", p))
1930                     self.assert_equal(
1931                         p[Ether].dst,
1932                         self.pg0.remote_mac,
1933                         "ECHO packet destination MAC address",
1934                     )
1935                     self.test_session.rx_packets_echo += 1
1936                     self.test_session.tx_packets_echo += 1
1937                     p[Ether].dst = self.pg0.local_mac
1938                     self.pg0.add_stream(p)
1939                     self.pg_start()
1940                     echo_seen = True
1941                 elif p.haslayer(BFD):
1942                     self.test_session.rx_packets += 1
1943                     if echo_seen:
1944                         self.assertGreaterEqual(
1945                             p[BFD].required_min_rx_interval, 1000000
1946                         )
1947                     if "P" in p.sprintf("%BFD.flags%"):
1948                         final = self.test_session.create_packet()
1949                         final[BFD].flags = "F"
1950                         self.test_session.send_packet(final)
1951                 else:
1952                     raise Exception(ppp("Received unknown packet:", p))
1953
1954                 self.assert_equal(
1955                     len(self.vapi.collect_events()), 0, "number of bfd events"
1956                 )
1957             self.test_session.send_packet()
1958         self.assertTrue(echo_seen, "No echo packets received")
1959
1960         stats_after = bfd_grab_stats_snapshot(self)
1961         diff = bfd_stats_diff(stats_before, stats_after)
1962         # our rx is vpp tx and vice versa, also tolerate one packet off
1963         self.assert_in_range(
1964             self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1965         )
1966         self.assert_in_range(
1967             self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1968         )
1969         self.assert_in_range(
1970             self.test_session.tx_packets_echo,
1971             diff.rx_echo - 1,
1972             diff.rx_echo + 1,
1973             "RX echo counter",
1974         )
1975         self.assert_in_range(
1976             self.test_session.rx_packets_echo,
1977             diff.tx_echo - 1,
1978             diff.tx_echo + 1,
1979             "TX echo counter",
1980         )
1981
1982     def test_intf_deleted(self):
1983         """interface with bfd session deleted"""
1984         intf = VppLoInterface(self)
1985         intf.config_ip6()
1986         intf.admin_up()
1987         sw_if_index = intf.sw_if_index
1988         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip6, af=AF_INET6)
1989         vpp_session.add_vpp_config()
1990         vpp_session.admin_up()
1991         intf.remove_vpp_config()
1992         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1993         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1994         self.assertFalse(vpp_session.query_vpp_config())
1995
1996
1997 @tag_run_solo
1998 class BFDFIBTestCase(VppTestCase):
1999     """BFD-FIB interactions (IPv6)"""
2000
2001     vpp_session = None
2002     test_session = None
2003
2004     @classmethod
2005     def setUpClass(cls):
2006         super(BFDFIBTestCase, cls).setUpClass()
2007
2008     @classmethod
2009     def tearDownClass(cls):
2010         super(BFDFIBTestCase, cls).tearDownClass()
2011
2012     def setUp(self):
2013         super(BFDFIBTestCase, self).setUp()
2014         self.create_pg_interfaces(range(1))
2015
2016         self.vapi.want_bfd_events()
2017         self.pg0.enable_capture()
2018
2019         for i in self.pg_interfaces:
2020             i.admin_up()
2021             i.config_ip6()
2022             i.configure_ipv6_neighbors()
2023
2024     def tearDown(self):
2025         if not self.vpp_dead:
2026             self.vapi.want_bfd_events(enable_disable=False)
2027
2028         super(BFDFIBTestCase, self).tearDown()
2029
2030     @staticmethod
2031     def pkt_is_not_data_traffic(p):
2032         """not data traffic implies BFD or the usual IPv6 ND/RA"""
2033         if p.haslayer(BFD) or is_ipv6_misc(p):
2034             return True
2035         return False
2036
2037     def test_session_with_fib(self):
2038         """BFD-FIB interactions"""
2039
2040         # packets to match against both of the routes
2041         p = [
2042             (
2043                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2044                 / IPv6(src="3001::1", dst="2001::1")
2045                 / UDP(sport=1234, dport=1234)
2046                 / Raw(b"\xa5" * 100)
2047             ),
2048             (
2049                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2050                 / IPv6(src="3001::1", dst="2002::1")
2051                 / UDP(sport=1234, dport=1234)
2052                 / Raw(b"\xa5" * 100)
2053             ),
2054         ]
2055
2056         # A recursive and a non-recursive route via a next-hop that
2057         # will have a BFD session
2058         ip_2001_s_64 = VppIpRoute(
2059             self,
2060             "2001::",
2061             64,
2062             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
2063         )
2064         ip_2002_s_64 = VppIpRoute(
2065             self, "2002::", 64, [VppRoutePath(self.pg0.remote_ip6, 0xFFFFFFFF)]
2066         )
2067         ip_2001_s_64.add_vpp_config()
2068         ip_2002_s_64.add_vpp_config()
2069
2070         # bring the session up now the routes are present
2071         self.vpp_session = VppBFDUDPSession(
2072             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
2073         )
2074         self.vpp_session.add_vpp_config()
2075         self.vpp_session.admin_up()
2076         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
2077
2078         # session is up - traffic passes
2079         bfd_session_up(self)
2080
2081         self.pg0.add_stream(p)
2082         self.pg_start()
2083         for packet in p:
2084             captured = self.pg0.wait_for_packet(
2085                 1, filter_out_fn=self.pkt_is_not_data_traffic
2086             )
2087             self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2088
2089         # session is up - traffic is dropped
2090         bfd_session_down(self)
2091
2092         self.pg0.add_stream(p)
2093         self.pg_start()
2094         with self.assertRaises(CaptureTimeoutError):
2095             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
2096
2097         # session is up - traffic passes
2098         bfd_session_up(self)
2099
2100         self.pg0.add_stream(p)
2101         self.pg_start()
2102         for packet in p:
2103             captured = self.pg0.wait_for_packet(
2104                 1, filter_out_fn=self.pkt_is_not_data_traffic
2105             )
2106             self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2107
2108
2109 @unittest.skipUnless(config.extended, "part of extended tests")
2110 class BFDTunTestCase(VppTestCase):
2111     """BFD over GRE tunnel"""
2112
2113     vpp_session = None
2114     test_session = None
2115
2116     @classmethod
2117     def setUpClass(cls):
2118         super(BFDTunTestCase, cls).setUpClass()
2119
2120     @classmethod
2121     def tearDownClass(cls):
2122         super(BFDTunTestCase, cls).tearDownClass()
2123
2124     def setUp(self):
2125         super(BFDTunTestCase, self).setUp()
2126         self.create_pg_interfaces(range(1))
2127
2128         self.vapi.want_bfd_events()
2129         self.pg0.enable_capture()
2130
2131         for i in self.pg_interfaces:
2132             i.admin_up()
2133             i.config_ip4()
2134             i.resolve_arp()
2135
2136     def tearDown(self):
2137         if not self.vpp_dead:
2138             self.vapi.want_bfd_events(enable_disable=0)
2139
2140         super(BFDTunTestCase, self).tearDown()
2141
2142     @staticmethod
2143     def pkt_is_not_data_traffic(p):
2144         """not data traffic implies BFD or the usual IPv6 ND/RA"""
2145         if p.haslayer(BFD) or is_ipv6_misc(p):
2146             return True
2147         return False
2148
2149     def test_bfd_o_gre(self):
2150         """BFD-o-GRE"""
2151
2152         # A GRE interface over which to run a BFD session
2153         gre_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
2154         gre_if.add_vpp_config()
2155         gre_if.admin_up()
2156         gre_if.config_ip4()
2157
2158         # bring the session up now the routes are present
2159         self.vpp_session = VppBFDUDPSession(
2160             self, gre_if, gre_if.remote_ip4, is_tunnel=True
2161         )
2162         self.vpp_session.add_vpp_config()
2163         self.vpp_session.admin_up()
2164
2165         self.test_session = BFDTestSession(
2166             self,
2167             gre_if,
2168             AF_INET,
2169             tunnel_header=(IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / GRE()),
2170             phy_interface=self.pg0,
2171         )
2172
2173         # packets to match against both of the routes
2174         p = [
2175             (
2176                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2177                 / IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4)
2178                 / UDP(sport=1234, dport=1234)
2179                 / Raw(b"\xa5" * 100)
2180             )
2181         ]
2182
2183         # session is up - traffic passes
2184         bfd_session_up(self)
2185
2186         self.send_and_expect(self.pg0, p, self.pg0)
2187
2188         # bring session down
2189         bfd_session_down(self)
2190
2191
2192 @tag_run_solo
2193 class BFDSHA1TestCase(VppTestCase):
2194     """Bidirectional Forwarding Detection (BFD) (SHA1 auth)"""
2195
2196     pg0 = None
2197     vpp_clock_offset = None
2198     vpp_session = None
2199     test_session = None
2200
2201     @classmethod
2202     def setUpClass(cls):
2203         super(BFDSHA1TestCase, cls).setUpClass()
2204         cls.vapi.cli("set log class bfd level debug")
2205         try:
2206             cls.create_pg_interfaces([0])
2207             cls.pg0.config_ip4()
2208             cls.pg0.admin_up()
2209             cls.pg0.resolve_arp()
2210
2211         except Exception:
2212             super(BFDSHA1TestCase, cls).tearDownClass()
2213             raise
2214
2215     @classmethod
2216     def tearDownClass(cls):
2217         super(BFDSHA1TestCase, cls).tearDownClass()
2218
2219     def setUp(self):
2220         super(BFDSHA1TestCase, self).setUp()
2221         self.factory = AuthKeyFactory()
2222         self.vapi.want_bfd_events()
2223         self.pg0.enable_capture()
2224
2225     def tearDown(self):
2226         if not self.vpp_dead:
2227             self.vapi.want_bfd_events(enable_disable=False)
2228         self.vapi.collect_events()  # clear the event queue
2229         super(BFDSHA1TestCase, self).tearDown()
2230
2231     def test_session_up(self):
2232         """bring BFD session up"""
2233         key = self.factory.create_random_key(self)
2234         key.add_vpp_config()
2235         self.vpp_session = VppBFDUDPSession(
2236             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2237         )
2238         self.vpp_session.add_vpp_config()
2239         self.vpp_session.admin_up()
2240         self.test_session = BFDTestSession(
2241             self,
2242             self.pg0,
2243             AF_INET,
2244             sha1_key=key,
2245             bfd_key_id=self.vpp_session.bfd_key_id,
2246         )
2247         bfd_session_up(self)
2248
2249     def test_hold_up(self):
2250         """hold BFD session up"""
2251         key = self.factory.create_random_key(self)
2252         key.add_vpp_config()
2253         self.vpp_session = VppBFDUDPSession(
2254             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2255         )
2256         self.vpp_session.add_vpp_config()
2257         self.vpp_session.admin_up()
2258         self.test_session = BFDTestSession(
2259             self,
2260             self.pg0,
2261             AF_INET,
2262             sha1_key=key,
2263             bfd_key_id=self.vpp_session.bfd_key_id,
2264         )
2265         bfd_session_up(self)
2266         for dummy in range(self.test_session.detect_mult * 2):
2267             wait_for_bfd_packet(self)
2268             self.test_session.send_packet()
2269         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2270
2271     def test_hold_up_meticulous(self):
2272         """hold BFD session up - meticulous auth"""
2273         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2274         key.add_vpp_config()
2275         self.vpp_session = VppBFDUDPSession(
2276             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2277         )
2278         self.vpp_session.add_vpp_config()
2279         self.vpp_session.admin_up()
2280         # specify sequence number so that it wraps
2281         self.test_session = BFDTestSession(
2282             self,
2283             self.pg0,
2284             AF_INET,
2285             sha1_key=key,
2286             bfd_key_id=self.vpp_session.bfd_key_id,
2287             our_seq_number=0xFFFFFFFF - 4,
2288         )
2289         bfd_session_up(self)
2290         for dummy in range(30):
2291             wait_for_bfd_packet(self)
2292             self.test_session.inc_seq_num()
2293             self.test_session.send_packet()
2294         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2295
2296     def test_send_bad_seq_number(self):
2297         """session is not kept alive by msgs with bad sequence numbers"""
2298         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2299         key.add_vpp_config()
2300         self.vpp_session = VppBFDUDPSession(
2301             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2302         )
2303         self.vpp_session.add_vpp_config()
2304         self.test_session = BFDTestSession(
2305             self,
2306             self.pg0,
2307             AF_INET,
2308             sha1_key=key,
2309             bfd_key_id=self.vpp_session.bfd_key_id,
2310         )
2311         bfd_session_up(self)
2312         detection_time = (
2313             self.test_session.detect_mult
2314             * self.vpp_session.required_min_rx
2315             / USEC_IN_SEC
2316         )
2317         send_until = time.time() + 2 * detection_time
2318         while time.time() < send_until:
2319             self.test_session.send_packet()
2320             self.sleep(
2321                 0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2322                 "time between bfd packets",
2323             )
2324         e = self.vapi.collect_events()
2325         # session should be down now, because the sequence numbers weren't
2326         # updated
2327         self.assert_equal(len(e), 1, "number of bfd events")
2328         verify_event(self, e[0], expected_state=BFDState.down)
2329
2330     def execute_rogue_session_scenario(
2331         self,
2332         vpp_bfd_udp_session,
2333         legitimate_test_session,
2334         rogue_test_session,
2335         rogue_bfd_values=None,
2336     ):
2337         """execute a rogue session interaction scenario
2338
2339         1. create vpp session, add config
2340         2. bring the legitimate session up
2341         3. copy the bfd values from legitimate session to rogue session
2342         4. apply rogue_bfd_values to rogue session
2343         5. set rogue session state to down
2344         6. send message to take the session down from the rogue session
2345         7. assert that the legitimate session is unaffected
2346         """
2347
2348         self.vpp_session = vpp_bfd_udp_session
2349         self.vpp_session.add_vpp_config()
2350         self.test_session = legitimate_test_session
2351         # bring vpp session up
2352         bfd_session_up(self)
2353         # send packet from rogue session
2354         rogue_test_session.update(
2355             my_discriminator=self.test_session.my_discriminator,
2356             your_discriminator=self.test_session.your_discriminator,
2357             desired_min_tx=self.test_session.desired_min_tx,
2358             required_min_rx=self.test_session.required_min_rx,
2359             detect_mult=self.test_session.detect_mult,
2360             diag=self.test_session.diag,
2361             state=self.test_session.state,
2362             auth_type=self.test_session.auth_type,
2363         )
2364         if rogue_bfd_values:
2365             rogue_test_session.update(**rogue_bfd_values)
2366         rogue_test_session.update(state=BFDState.down)
2367         rogue_test_session.send_packet()
2368         wait_for_bfd_packet(self)
2369         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2370
2371     def test_mismatch_auth(self):
2372         """session is not brought down by unauthenticated msg"""
2373         key = self.factory.create_random_key(self)
2374         key.add_vpp_config()
2375         vpp_session = VppBFDUDPSession(
2376             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2377         )
2378         legitimate_test_session = BFDTestSession(
2379             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2380         )
2381         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2382         self.execute_rogue_session_scenario(
2383             vpp_session, legitimate_test_session, rogue_test_session
2384         )
2385
2386     def test_mismatch_bfd_key_id(self):
2387         """session is not brought down by msg with non-existent key-id"""
2388         key = self.factory.create_random_key(self)
2389         key.add_vpp_config()
2390         vpp_session = VppBFDUDPSession(
2391             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2392         )
2393         # pick a different random bfd key id
2394         x = randint(0, 255)
2395         while x == vpp_session.bfd_key_id:
2396             x = randint(0, 255)
2397         legitimate_test_session = BFDTestSession(
2398             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2399         )
2400         rogue_test_session = BFDTestSession(
2401             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x
2402         )
2403         self.execute_rogue_session_scenario(
2404             vpp_session, legitimate_test_session, rogue_test_session
2405         )
2406
2407     def test_mismatched_auth_type(self):
2408         """session is not brought down by msg with wrong auth type"""
2409         key = self.factory.create_random_key(self)
2410         key.add_vpp_config()
2411         vpp_session = VppBFDUDPSession(
2412             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2413         )
2414         legitimate_test_session = BFDTestSession(
2415             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2416         )
2417         rogue_test_session = BFDTestSession(
2418             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2419         )
2420         self.execute_rogue_session_scenario(
2421             vpp_session,
2422             legitimate_test_session,
2423             rogue_test_session,
2424             {"auth_type": BFDAuthType.keyed_md5},
2425         )
2426
2427     def test_restart(self):
2428         """simulate remote peer restart and resynchronization"""
2429         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2430         key.add_vpp_config()
2431         self.vpp_session = VppBFDUDPSession(
2432             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2433         )
2434         self.vpp_session.add_vpp_config()
2435         self.test_session = BFDTestSession(
2436             self,
2437             self.pg0,
2438             AF_INET,
2439             sha1_key=key,
2440             bfd_key_id=self.vpp_session.bfd_key_id,
2441             our_seq_number=0,
2442         )
2443         bfd_session_up(self)
2444         # don't send any packets for 2*detection_time
2445         detection_time = (
2446             self.test_session.detect_mult
2447             * self.vpp_session.required_min_rx
2448             / USEC_IN_SEC
2449         )
2450         self.sleep(2 * detection_time, "simulating peer restart")
2451         events = self.vapi.collect_events()
2452         self.assert_equal(len(events), 1, "number of bfd events")
2453         verify_event(self, events[0], expected_state=BFDState.down)
2454         self.test_session.update(state=BFDState.down)
2455         # reset sequence number
2456         self.test_session.our_seq_number = 0
2457         self.test_session.vpp_seq_number = None
2458         # now throw away any pending packets
2459         self.pg0.enable_capture()
2460         self.test_session.my_discriminator = 0
2461         bfd_session_up(self)
2462
2463
2464 @tag_run_solo
2465 class BFDAuthOnOffTestCase(VppTestCase):
2466     """Bidirectional Forwarding Detection (BFD) (changing auth)"""
2467
2468     pg0 = None
2469     vpp_session = None
2470     test_session = None
2471
2472     @classmethod
2473     def setUpClass(cls):
2474         super(BFDAuthOnOffTestCase, cls).setUpClass()
2475         cls.vapi.cli("set log class bfd level debug")
2476         try:
2477             cls.create_pg_interfaces([0])
2478             cls.pg0.config_ip4()
2479             cls.pg0.admin_up()
2480             cls.pg0.resolve_arp()
2481
2482         except Exception:
2483             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2484             raise
2485
2486     @classmethod
2487     def tearDownClass(cls):
2488         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2489
2490     def setUp(self):
2491         super(BFDAuthOnOffTestCase, self).setUp()
2492         self.factory = AuthKeyFactory()
2493         self.vapi.want_bfd_events()
2494         self.pg0.enable_capture()
2495
2496     def tearDown(self):
2497         if not self.vpp_dead:
2498             self.vapi.want_bfd_events(enable_disable=False)
2499         self.vapi.collect_events()  # clear the event queue
2500         super(BFDAuthOnOffTestCase, self).tearDown()
2501
2502     def test_auth_on_immediate(self):
2503         """turn auth on without disturbing session state (immediate)"""
2504         key = self.factory.create_random_key(self)
2505         key.add_vpp_config()
2506         self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2507         self.vpp_session.add_vpp_config()
2508         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2509         bfd_session_up(self)
2510         for dummy in range(self.test_session.detect_mult * 2):
2511             p = wait_for_bfd_packet(self)
2512             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2513             self.test_session.send_packet()
2514         self.vpp_session.activate_auth(key)
2515         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2516         self.test_session.sha1_key = key
2517         for dummy in range(self.test_session.detect_mult * 2):
2518             p = wait_for_bfd_packet(self)
2519             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2520             self.test_session.send_packet()
2521         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2522         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2523
2524     def test_auth_off_immediate(self):
2525         """turn auth off without disturbing session state (immediate)"""
2526         key = self.factory.create_random_key(self)
2527         key.add_vpp_config()
2528         self.vpp_session = VppBFDUDPSession(
2529             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2530         )
2531         self.vpp_session.add_vpp_config()
2532         self.test_session = BFDTestSession(
2533             self,
2534             self.pg0,
2535             AF_INET,
2536             sha1_key=key,
2537             bfd_key_id=self.vpp_session.bfd_key_id,
2538         )
2539         bfd_session_up(self)
2540         # self.vapi.want_bfd_events(enable_disable=0)
2541         for dummy in range(self.test_session.detect_mult * 2):
2542             p = wait_for_bfd_packet(self)
2543             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2544             self.test_session.inc_seq_num()
2545             self.test_session.send_packet()
2546         self.vpp_session.deactivate_auth()
2547         self.test_session.bfd_key_id = None
2548         self.test_session.sha1_key = None
2549         for dummy in range(self.test_session.detect_mult * 2):
2550             p = wait_for_bfd_packet(self)
2551             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2552             self.test_session.inc_seq_num()
2553             self.test_session.send_packet()
2554         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2555         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2556
2557     def test_auth_change_key_immediate(self):
2558         """change auth key without disturbing session state (immediate)"""
2559         key1 = self.factory.create_random_key(self)
2560         key1.add_vpp_config()
2561         key2 = self.factory.create_random_key(self)
2562         key2.add_vpp_config()
2563         self.vpp_session = VppBFDUDPSession(
2564             self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2565         )
2566         self.vpp_session.add_vpp_config()
2567         self.test_session = BFDTestSession(
2568             self,
2569             self.pg0,
2570             AF_INET,
2571             sha1_key=key1,
2572             bfd_key_id=self.vpp_session.bfd_key_id,
2573         )
2574         bfd_session_up(self)
2575         for dummy in range(self.test_session.detect_mult * 2):
2576             p = wait_for_bfd_packet(self)
2577             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2578             self.test_session.send_packet()
2579         self.vpp_session.activate_auth(key2)
2580         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2581         self.test_session.sha1_key = key2
2582         for dummy in range(self.test_session.detect_mult * 2):
2583             p = wait_for_bfd_packet(self)
2584             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2585             self.test_session.send_packet()
2586         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2587         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2588
2589     def test_auth_on_delayed(self):
2590         """turn auth on without disturbing session state (delayed)"""
2591         key = self.factory.create_random_key(self)
2592         key.add_vpp_config()
2593         self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2594         self.vpp_session.add_vpp_config()
2595         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2596         bfd_session_up(self)
2597         for dummy in range(self.test_session.detect_mult * 2):
2598             wait_for_bfd_packet(self)
2599             self.test_session.send_packet()
2600         self.vpp_session.activate_auth(key, delayed=True)
2601         for dummy in range(self.test_session.detect_mult * 2):
2602             p = wait_for_bfd_packet(self)
2603             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2604             self.test_session.send_packet()
2605         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2606         self.test_session.sha1_key = key
2607         self.test_session.send_packet()
2608         for dummy in range(self.test_session.detect_mult * 2):
2609             p = wait_for_bfd_packet(self)
2610             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2611             self.test_session.send_packet()
2612         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2613         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2614
2615     def test_auth_off_delayed(self):
2616         """turn auth off without disturbing session state (delayed)"""
2617         key = self.factory.create_random_key(self)
2618         key.add_vpp_config()
2619         self.vpp_session = VppBFDUDPSession(
2620             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2621         )
2622         self.vpp_session.add_vpp_config()
2623         self.test_session = BFDTestSession(
2624             self,
2625             self.pg0,
2626             AF_INET,
2627             sha1_key=key,
2628             bfd_key_id=self.vpp_session.bfd_key_id,
2629         )
2630         bfd_session_up(self)
2631         for dummy in range(self.test_session.detect_mult * 2):
2632             p = wait_for_bfd_packet(self)
2633             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2634             self.test_session.send_packet()
2635         self.vpp_session.deactivate_auth(delayed=True)
2636         for dummy in range(self.test_session.detect_mult * 2):
2637             p = wait_for_bfd_packet(self)
2638             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2639             self.test_session.send_packet()
2640         self.test_session.bfd_key_id = None
2641         self.test_session.sha1_key = None
2642         self.test_session.send_packet()
2643         for dummy in range(self.test_session.detect_mult * 2):
2644             p = wait_for_bfd_packet(self)
2645             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2646             self.test_session.send_packet()
2647         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2648         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2649
2650     def test_auth_change_key_delayed(self):
2651         """change auth key without disturbing session state (delayed)"""
2652         key1 = self.factory.create_random_key(self)
2653         key1.add_vpp_config()
2654         key2 = self.factory.create_random_key(self)
2655         key2.add_vpp_config()
2656         self.vpp_session = VppBFDUDPSession(
2657             self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2658         )
2659         self.vpp_session.add_vpp_config()
2660         self.vpp_session.admin_up()
2661         self.test_session = BFDTestSession(
2662             self,
2663             self.pg0,
2664             AF_INET,
2665             sha1_key=key1,
2666             bfd_key_id=self.vpp_session.bfd_key_id,
2667         )
2668         bfd_session_up(self)
2669         for dummy in range(self.test_session.detect_mult * 2):
2670             p = wait_for_bfd_packet(self)
2671             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2672             self.test_session.send_packet()
2673         self.vpp_session.activate_auth(key2, delayed=True)
2674         for dummy in range(self.test_session.detect_mult * 2):
2675             p = wait_for_bfd_packet(self)
2676             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2677             self.test_session.send_packet()
2678         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2679         self.test_session.sha1_key = key2
2680         self.test_session.send_packet()
2681         for dummy in range(self.test_session.detect_mult * 2):
2682             p = wait_for_bfd_packet(self)
2683             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2684             self.test_session.send_packet()
2685         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2686         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2687
2688
2689 @tag_run_solo
2690 class BFDCLITestCase(VppTestCase):
2691     """Bidirectional Forwarding Detection (BFD) (CLI)"""
2692
2693     pg0 = None
2694
2695     @classmethod
2696     def setUpClass(cls):
2697         super(BFDCLITestCase, cls).setUpClass()
2698         cls.vapi.cli("set log class bfd level debug")
2699         try:
2700             cls.create_pg_interfaces((0,))
2701             cls.pg0.config_ip4()
2702             cls.pg0.config_ip6()
2703             cls.pg0.resolve_arp()
2704             cls.pg0.resolve_ndp()
2705
2706         except Exception:
2707             super(BFDCLITestCase, cls).tearDownClass()
2708             raise
2709
2710     @classmethod
2711     def tearDownClass(cls):
2712         super(BFDCLITestCase, cls).tearDownClass()
2713
2714     def setUp(self):
2715         super(BFDCLITestCase, self).setUp()
2716         self.factory = AuthKeyFactory()
2717         self.pg0.enable_capture()
2718
2719     def tearDown(self):
2720         try:
2721             self.vapi.want_bfd_events(enable_disable=False)
2722         except UnexpectedApiReturnValueError:
2723             # some tests aren't subscribed, so this is not an issue
2724             pass
2725         self.vapi.collect_events()  # clear the event queue
2726         super(BFDCLITestCase, self).tearDown()
2727
2728     def cli_verify_no_response(self, cli):
2729         """execute a CLI, asserting that the response is empty"""
2730         self.assert_equal(self.vapi.cli(cli), "", "CLI command response")
2731
2732     def cli_verify_response(self, cli, expected):
2733         """execute a CLI, asserting that the response matches expectation"""
2734         try:
2735             reply = self.vapi.cli(cli)
2736         except CliFailedCommandError as cli_error:
2737             reply = str(cli_error)
2738         self.assert_equal(reply.strip(), expected, "CLI command response")
2739
2740     def test_show(self):
2741         """show commands"""
2742         k1 = self.factory.create_random_key(self)
2743         k1.add_vpp_config()
2744         k2 = self.factory.create_random_key(
2745             self, auth_type=BFDAuthType.meticulous_keyed_sha1
2746         )
2747         k2.add_vpp_config()
2748         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2749         s1.add_vpp_config()
2750         s2 = VppBFDUDPSession(
2751             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k2
2752         )
2753         s2.add_vpp_config()
2754         self.logger.info(self.vapi.ppcli("show bfd keys"))
2755         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2756         self.logger.info(self.vapi.ppcli("show bfd"))
2757
2758     def test_set_del_sha1_key(self):
2759         """set/delete SHA1 auth key"""
2760         k = self.factory.create_random_key(self)
2761         self.registry.register(k, self.logger)
2762         self.cli_verify_no_response(
2763             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2764             % (
2765                 k.conf_key_id,
2766                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2767             )
2768         )
2769         self.assertTrue(k.query_vpp_config())
2770         self.vpp_session = VppBFDUDPSession(
2771             self, self.pg0, self.pg0.remote_ip4, sha1_key=k
2772         )
2773         self.vpp_session.add_vpp_config()
2774         self.test_session = BFDTestSession(
2775             self, self.pg0, AF_INET, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2776         )
2777         self.vapi.want_bfd_events()
2778         bfd_session_up(self)
2779         bfd_session_down(self)
2780         # try to replace the secret for the key - should fail because the key
2781         # is in-use
2782         k2 = self.factory.create_random_key(self)
2783         self.cli_verify_response(
2784             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2785             % (
2786                 k.conf_key_id,
2787                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2788             ),
2789             "bfd key set: `bfd_auth_set_key' API call failed, "
2790             "rv=-103:BFD object in use",
2791         )
2792         # manipulating the session using old secret should still work
2793         bfd_session_up(self)
2794         bfd_session_down(self)
2795         self.vpp_session.remove_vpp_config()
2796         self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2797         self.assertFalse(k.query_vpp_config())
2798
2799     def test_set_del_meticulous_sha1_key(self):
2800         """set/delete meticulous SHA1 auth key"""
2801         k = self.factory.create_random_key(
2802             self, auth_type=BFDAuthType.meticulous_keyed_sha1
2803         )
2804         self.registry.register(k, self.logger)
2805         self.cli_verify_no_response(
2806             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s"
2807             % (
2808                 k.conf_key_id,
2809                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2810             )
2811         )
2812         self.assertTrue(k.query_vpp_config())
2813         self.vpp_session = VppBFDUDPSession(
2814             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k
2815         )
2816         self.vpp_session.add_vpp_config()
2817         self.vpp_session.admin_up()
2818         self.test_session = BFDTestSession(
2819             self, self.pg0, AF_INET6, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2820         )
2821         self.vapi.want_bfd_events()
2822         bfd_session_up(self)
2823         bfd_session_down(self)
2824         # try to replace the secret for the key - should fail because the key
2825         # is in-use
2826         k2 = self.factory.create_random_key(self)
2827         self.cli_verify_response(
2828             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2829             % (
2830                 k.conf_key_id,
2831                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2832             ),
2833             "bfd key set: `bfd_auth_set_key' API call failed, "
2834             "rv=-103:BFD object in use",
2835         )
2836         # manipulating the session using old secret should still work
2837         bfd_session_up(self)
2838         bfd_session_down(self)
2839         self.vpp_session.remove_vpp_config()
2840         self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2841         self.assertFalse(k.query_vpp_config())
2842
2843     def test_add_mod_del_bfd_udp(self):
2844         """create/modify/delete IPv4 BFD UDP session"""
2845         vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2846         self.registry.register(vpp_session, self.logger)
2847         cli_add_cmd = (
2848             "bfd udp session add interface %s local-addr %s "
2849             "peer-addr %s desired-min-tx %s required-min-rx %s "
2850             "detect-mult %s"
2851             % (
2852                 self.pg0.name,
2853                 self.pg0.local_ip4,
2854                 self.pg0.remote_ip4,
2855                 vpp_session.desired_min_tx,
2856                 vpp_session.required_min_rx,
2857                 vpp_session.detect_mult,
2858             )
2859         )
2860         self.cli_verify_no_response(cli_add_cmd)
2861         # 2nd add should fail
2862         self.cli_verify_response(
2863             cli_add_cmd,
2864             "bfd udp session add: `bfd_add_add_session' API call"
2865             " failed, rv=-101:Duplicate BFD object",
2866         )
2867         verify_bfd_session_config(self, vpp_session)
2868         mod_session = VppBFDUDPSession(
2869             self,
2870             self.pg0,
2871             self.pg0.remote_ip4,
2872             required_min_rx=2 * vpp_session.required_min_rx,
2873             desired_min_tx=3 * vpp_session.desired_min_tx,
2874             detect_mult=4 * vpp_session.detect_mult,
2875         )
2876         self.cli_verify_no_response(
2877             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2878             "desired-min-tx %s required-min-rx %s detect-mult %s"
2879             % (
2880                 self.pg0.name,
2881                 self.pg0.local_ip4,
2882                 self.pg0.remote_ip4,
2883                 mod_session.desired_min_tx,
2884                 mod_session.required_min_rx,
2885                 mod_session.detect_mult,
2886             )
2887         )
2888         verify_bfd_session_config(self, mod_session)
2889         cli_del_cmd = (
2890             "bfd udp session del interface %s local-addr %s "
2891             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2892         )
2893         self.cli_verify_no_response(cli_del_cmd)
2894         # 2nd del is expected to fail
2895         self.cli_verify_response(
2896             cli_del_cmd,
2897             "bfd udp session del: `bfd_udp_del_session' API call"
2898             " failed, rv=-102:No such BFD object",
2899         )
2900         self.assertFalse(vpp_session.query_vpp_config())
2901
2902     def test_add_mod_del_bfd_udp6(self):
2903         """create/modify/delete IPv6 BFD UDP session"""
2904         vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2905         self.registry.register(vpp_session, self.logger)
2906         cli_add_cmd = (
2907             "bfd udp session add interface %s local-addr %s "
2908             "peer-addr %s desired-min-tx %s required-min-rx %s "
2909             "detect-mult %s"
2910             % (
2911                 self.pg0.name,
2912                 self.pg0.local_ip6,
2913                 self.pg0.remote_ip6,
2914                 vpp_session.desired_min_tx,
2915                 vpp_session.required_min_rx,
2916                 vpp_session.detect_mult,
2917             )
2918         )
2919         self.cli_verify_no_response(cli_add_cmd)
2920         # 2nd add should fail
2921         self.cli_verify_response(
2922             cli_add_cmd,
2923             "bfd udp session add: `bfd_add_add_session' API call"
2924             " failed, rv=-101:Duplicate BFD object",
2925         )
2926         verify_bfd_session_config(self, vpp_session)
2927         mod_session = VppBFDUDPSession(
2928             self,
2929             self.pg0,
2930             self.pg0.remote_ip6,
2931             af=AF_INET6,
2932             required_min_rx=2 * vpp_session.required_min_rx,
2933             desired_min_tx=3 * vpp_session.desired_min_tx,
2934             detect_mult=4 * vpp_session.detect_mult,
2935         )
2936         self.cli_verify_no_response(
2937             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2938             "desired-min-tx %s required-min-rx %s detect-mult %s"
2939             % (
2940                 self.pg0.name,
2941                 self.pg0.local_ip6,
2942                 self.pg0.remote_ip6,
2943                 mod_session.desired_min_tx,
2944                 mod_session.required_min_rx,
2945                 mod_session.detect_mult,
2946             )
2947         )
2948         verify_bfd_session_config(self, mod_session)
2949         cli_del_cmd = (
2950             "bfd udp session del interface %s local-addr %s "
2951             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
2952         )
2953         self.cli_verify_no_response(cli_del_cmd)
2954         # 2nd del is expected to fail
2955         self.cli_verify_response(
2956             cli_del_cmd,
2957             "bfd udp session del: `bfd_udp_del_session' API call"
2958             " failed, rv=-102:No such BFD object",
2959         )
2960         self.assertFalse(vpp_session.query_vpp_config())
2961
2962     def test_add_mod_del_bfd_udp_auth(self):
2963         """create/modify/delete IPv4 BFD UDP session (authenticated)"""
2964         key = self.factory.create_random_key(self)
2965         key.add_vpp_config()
2966         vpp_session = VppBFDUDPSession(
2967             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2968         )
2969         self.registry.register(vpp_session, self.logger)
2970         cli_add_cmd = (
2971             "bfd udp session add interface %s local-addr %s "
2972             "peer-addr %s desired-min-tx %s required-min-rx %s "
2973             "detect-mult %s conf-key-id %s bfd-key-id %s"
2974             % (
2975                 self.pg0.name,
2976                 self.pg0.local_ip4,
2977                 self.pg0.remote_ip4,
2978                 vpp_session.desired_min_tx,
2979                 vpp_session.required_min_rx,
2980                 vpp_session.detect_mult,
2981                 key.conf_key_id,
2982                 vpp_session.bfd_key_id,
2983             )
2984         )
2985         self.cli_verify_no_response(cli_add_cmd)
2986         # 2nd add should fail
2987         self.cli_verify_response(
2988             cli_add_cmd,
2989             "bfd udp session add: `bfd_add_add_session' API call"
2990             " failed, rv=-101:Duplicate BFD object",
2991         )
2992         verify_bfd_session_config(self, vpp_session)
2993         mod_session = VppBFDUDPSession(
2994             self,
2995             self.pg0,
2996             self.pg0.remote_ip4,
2997             sha1_key=key,
2998             bfd_key_id=vpp_session.bfd_key_id,
2999             required_min_rx=2 * vpp_session.required_min_rx,
3000             desired_min_tx=3 * vpp_session.desired_min_tx,
3001             detect_mult=4 * vpp_session.detect_mult,
3002         )
3003         self.cli_verify_no_response(
3004             "bfd udp session mod interface %s local-addr %s peer-addr %s "
3005             "desired-min-tx %s required-min-rx %s detect-mult %s"
3006             % (
3007                 self.pg0.name,
3008                 self.pg0.local_ip4,
3009                 self.pg0.remote_ip4,
3010                 mod_session.desired_min_tx,
3011                 mod_session.required_min_rx,
3012                 mod_session.detect_mult,
3013             )
3014         )
3015         verify_bfd_session_config(self, mod_session)
3016         cli_del_cmd = (
3017             "bfd udp session del interface %s local-addr %s "
3018             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3019         )
3020         self.cli_verify_no_response(cli_del_cmd)
3021         # 2nd del is expected to fail
3022         self.cli_verify_response(
3023             cli_del_cmd,
3024             "bfd udp session del: `bfd_udp_del_session' API call"
3025             " failed, rv=-102:No such BFD object",
3026         )
3027         self.assertFalse(vpp_session.query_vpp_config())
3028
3029     def test_add_mod_del_bfd_udp6_auth(self):
3030         """create/modify/delete IPv6 BFD UDP session (authenticated)"""
3031         key = self.factory.create_random_key(
3032             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3033         )
3034         key.add_vpp_config()
3035         vpp_session = VppBFDUDPSession(
3036             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key
3037         )
3038         self.registry.register(vpp_session, self.logger)
3039         cli_add_cmd = (
3040             "bfd udp session add interface %s local-addr %s "
3041             "peer-addr %s desired-min-tx %s required-min-rx %s "
3042             "detect-mult %s conf-key-id %s bfd-key-id %s"
3043             % (
3044                 self.pg0.name,
3045                 self.pg0.local_ip6,
3046                 self.pg0.remote_ip6,
3047                 vpp_session.desired_min_tx,
3048                 vpp_session.required_min_rx,
3049                 vpp_session.detect_mult,
3050                 key.conf_key_id,
3051                 vpp_session.bfd_key_id,
3052             )
3053         )
3054         self.cli_verify_no_response(cli_add_cmd)
3055         # 2nd add should fail
3056         self.cli_verify_response(
3057             cli_add_cmd,
3058             "bfd udp session add: `bfd_add_add_session' API call"
3059             " failed, rv=-101:Duplicate BFD object",
3060         )
3061         verify_bfd_session_config(self, vpp_session)
3062         mod_session = VppBFDUDPSession(
3063             self,
3064             self.pg0,
3065             self.pg0.remote_ip6,
3066             af=AF_INET6,
3067             sha1_key=key,
3068             bfd_key_id=vpp_session.bfd_key_id,
3069             required_min_rx=2 * vpp_session.required_min_rx,
3070             desired_min_tx=3 * vpp_session.desired_min_tx,
3071             detect_mult=4 * vpp_session.detect_mult,
3072         )
3073         self.cli_verify_no_response(
3074             "bfd udp session mod interface %s local-addr %s peer-addr %s "
3075             "desired-min-tx %s required-min-rx %s detect-mult %s"
3076             % (
3077                 self.pg0.name,
3078                 self.pg0.local_ip6,
3079                 self.pg0.remote_ip6,
3080                 mod_session.desired_min_tx,
3081                 mod_session.required_min_rx,
3082                 mod_session.detect_mult,
3083             )
3084         )
3085         verify_bfd_session_config(self, mod_session)
3086         cli_del_cmd = (
3087             "bfd udp session del interface %s local-addr %s "
3088             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
3089         )
3090         self.cli_verify_no_response(cli_del_cmd)
3091         # 2nd del is expected to fail
3092         self.cli_verify_response(
3093             cli_del_cmd,
3094             "bfd udp session del: `bfd_udp_del_session' API call"
3095             " failed, rv=-102:No such BFD object",
3096         )
3097         self.assertFalse(vpp_session.query_vpp_config())
3098
3099     def test_auth_on_off(self):
3100         """turn authentication on and off"""
3101         key = self.factory.create_random_key(
3102             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3103         )
3104         key.add_vpp_config()
3105         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3106         auth_session = VppBFDUDPSession(
3107             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3108         )
3109         session.add_vpp_config()
3110         cli_activate = (
3111             "bfd udp session auth activate interface %s local-addr %s "
3112             "peer-addr %s conf-key-id %s bfd-key-id %s"
3113             % (
3114                 self.pg0.name,
3115                 self.pg0.local_ip4,
3116                 self.pg0.remote_ip4,
3117                 key.conf_key_id,
3118                 auth_session.bfd_key_id,
3119             )
3120         )
3121         self.cli_verify_no_response(cli_activate)
3122         verify_bfd_session_config(self, auth_session)
3123         self.cli_verify_no_response(cli_activate)
3124         verify_bfd_session_config(self, auth_session)
3125         cli_deactivate = (
3126             "bfd udp session auth deactivate interface %s local-addr %s "
3127             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3128         )
3129         self.cli_verify_no_response(cli_deactivate)
3130         verify_bfd_session_config(self, session)
3131         self.cli_verify_no_response(cli_deactivate)
3132         verify_bfd_session_config(self, session)
3133
3134     def test_auth_on_off_delayed(self):
3135         """turn authentication on and off (delayed)"""
3136         key = self.factory.create_random_key(
3137             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3138         )
3139         key.add_vpp_config()
3140         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3141         auth_session = VppBFDUDPSession(
3142             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3143         )
3144         session.add_vpp_config()
3145         cli_activate = (
3146             "bfd udp session auth activate interface %s local-addr %s "
3147             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"
3148             % (
3149                 self.pg0.name,
3150                 self.pg0.local_ip4,
3151                 self.pg0.remote_ip4,
3152                 key.conf_key_id,
3153                 auth_session.bfd_key_id,
3154             )
3155         )
3156         self.cli_verify_no_response(cli_activate)
3157         verify_bfd_session_config(self, auth_session)
3158         self.cli_verify_no_response(cli_activate)
3159         verify_bfd_session_config(self, auth_session)
3160         cli_deactivate = (
3161             "bfd udp session auth deactivate interface %s local-addr %s "
3162             "peer-addr %s delayed yes"
3163             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3164         )
3165         self.cli_verify_no_response(cli_deactivate)
3166         verify_bfd_session_config(self, session)
3167         self.cli_verify_no_response(cli_deactivate)
3168         verify_bfd_session_config(self, session)
3169
3170     def test_admin_up_down(self):
3171         """put session admin-up and admin-down"""
3172         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3173         session.add_vpp_config()
3174         cli_down = (
3175             "bfd udp session set-flags admin down interface %s local-addr %s "
3176             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3177         )
3178         cli_up = (
3179             "bfd udp session set-flags admin up interface %s local-addr %s "
3180             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3181         )
3182         self.cli_verify_no_response(cli_down)
3183         verify_bfd_session_config(self, session, state=BFDState.admin_down)
3184         self.cli_verify_no_response(cli_up)
3185         verify_bfd_session_config(self, session, state=BFDState.down)
3186
3187     def test_set_del_udp_echo_source(self):
3188         """set/del udp echo source"""
3189         self.create_loopback_interfaces(1)
3190         self.loopback0 = self.lo_interfaces[0]
3191         self.loopback0.admin_up()
3192         self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3193         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
3194         self.cli_verify_no_response(cli_set)
3195         self.cli_verify_response(
3196             "show bfd echo-source",
3197             "UDP echo source is: %s\n"
3198             "IPv4 address usable as echo source: none\n"
3199             "IPv6 address usable as echo source: none" % self.loopback0.name,
3200         )
3201         self.loopback0.config_ip4()
3202         echo_ip4 = str(
3203             ipaddress.IPv4Address(
3204                 int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
3205             )
3206         )
3207         self.cli_verify_response(
3208             "show bfd echo-source",
3209             "UDP echo source is: %s\n"
3210             "IPv4 address usable as echo source: %s\n"
3211             "IPv6 address usable as echo source: none"
3212             % (self.loopback0.name, echo_ip4),
3213         )
3214         echo_ip6 = str(
3215             ipaddress.IPv6Address(
3216                 int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
3217             )
3218         )
3219         self.loopback0.config_ip6()
3220         self.cli_verify_response(
3221             "show bfd echo-source",
3222             "UDP echo source is: %s\n"
3223             "IPv4 address usable as echo source: %s\n"
3224             "IPv6 address usable as echo source: %s"
3225             % (self.loopback0.name, echo_ip4, echo_ip6),
3226         )
3227         cli_del = "bfd udp echo-source del"
3228         self.cli_verify_no_response(cli_del)
3229         self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3230
3231
3232 if __name__ == "__main__":
3233     unittest.main(testRunner=VppTestRunner)