bfd: add tracing support to bfd-process
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 from collections import namedtuple
8 import hashlib
9 import ipaddress
10 import reprlib
11 import time
12 import unittest
13 from random import randint, shuffle, getrandbits
14 from socket import AF_INET, AF_INET6, inet_ntop
15 from struct import pack, unpack
16
17 import scapy.compat
18 from scapy.layers.inet import UDP, IP
19 from scapy.layers.inet6 import IPv6
20 from scapy.layers.l2 import Ether, GRE
21 from scapy.packet import Raw
22
23 from config import config
24 from bfd import (
25     VppBFDAuthKey,
26     BFD,
27     BFDAuthType,
28     VppBFDUDPSession,
29     BFDDiagCode,
30     BFDState,
31     BFD_vpp_echo,
32 )
33 from framework import tag_fixme_vpp_workers
34 from framework import VppTestCase, VppTestRunner
35 from framework import tag_run_solo
36 from util import ppp
37 from vpp_ip import DpoProto
38 from vpp_ip_route import VppIpRoute, VppRoutePath
39 from vpp_lo_interface import VppLoInterface
40 from vpp_papi_provider import UnexpectedApiReturnValueError, CliFailedCommandError
41 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
42 from vpp_gre_interface import VppGreInterface
43 from vpp_papi import VppEnum
44
45 USEC_IN_SEC = 1000000
46
47
48 class AuthKeyFactory(object):
49     """Factory class for creating auth keys with unique conf key ID"""
50
51     def __init__(self):
52         self._conf_key_ids = {}
53
54     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
55         """create a random key with unique conf key id"""
56         conf_key_id = randint(0, 0xFFFFFFFF)
57         while conf_key_id in self._conf_key_ids:
58             conf_key_id = randint(0, 0xFFFFFFFF)
59         self._conf_key_ids[conf_key_id] = 1
60         key = scapy.compat.raw(
61             bytearray([randint(0, 255) for _ in range(randint(1, 20))])
62         )
63         return VppBFDAuthKey(
64             test=test, auth_type=auth_type, conf_key_id=conf_key_id, key=key
65         )
66
67
68 class BFDAPITestCase(VppTestCase):
69     """Bidirectional Forwarding Detection (BFD) - API"""
70
71     pg0 = None
72     pg1 = None
73
74     @classmethod
75     def setUpClass(cls):
76         super(BFDAPITestCase, cls).setUpClass()
77         cls.vapi.cli("set log class bfd level debug")
78         try:
79             cls.create_pg_interfaces(range(2))
80             for i in cls.pg_interfaces:
81                 i.config_ip4()
82                 i.config_ip6()
83                 i.resolve_arp()
84
85         except Exception:
86             super(BFDAPITestCase, cls).tearDownClass()
87             raise
88
89     @classmethod
90     def tearDownClass(cls):
91         super(BFDAPITestCase, cls).tearDownClass()
92
93     def setUp(self):
94         super(BFDAPITestCase, self).setUp()
95         self.factory = AuthKeyFactory()
96
97     def test_add_bfd(self):
98         """create a BFD session"""
99         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
100         session.add_vpp_config()
101         self.logger.debug("Session state is %s", session.state)
102         session.remove_vpp_config()
103         session.add_vpp_config()
104         self.logger.debug("Session state is %s", session.state)
105         session.remove_vpp_config()
106
107     def test_double_add(self):
108         """create the same BFD session twice (negative case)"""
109         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
110         session.add_vpp_config()
111
112         with self.vapi.assert_negative_api_retval():
113             session.add_vpp_config()
114
115         session.remove_vpp_config()
116
117     def test_add_bfd6(self):
118         """create IPv6 BFD session"""
119         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
120         session.add_vpp_config()
121         self.logger.debug("Session state is %s", session.state)
122         session.remove_vpp_config()
123         session.add_vpp_config()
124         self.logger.debug("Session state is %s", session.state)
125         session.remove_vpp_config()
126
127     def test_mod_bfd(self):
128         """modify BFD session parameters"""
129         session = VppBFDUDPSession(
130             self,
131             self.pg0,
132             self.pg0.remote_ip4,
133             desired_min_tx=50000,
134             required_min_rx=10000,
135             detect_mult=1,
136         )
137         session.add_vpp_config()
138         s = session.get_bfd_udp_session_dump_entry()
139         self.assert_equal(
140             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
141         )
142         self.assert_equal(
143             session.required_min_rx, s.required_min_rx, "required min receive interval"
144         )
145         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
146         session.modify_parameters(
147             desired_min_tx=session.desired_min_tx * 2,
148             required_min_rx=session.required_min_rx * 2,
149             detect_mult=session.detect_mult * 2,
150         )
151         s = session.get_bfd_udp_session_dump_entry()
152         self.assert_equal(
153             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
154         )
155         self.assert_equal(
156             session.required_min_rx, s.required_min_rx, "required min receive interval"
157         )
158         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
159
160     def test_upd_bfd(self):
161         """Create/Modify w/ Update BFD session parameters"""
162         session = VppBFDUDPSession(
163             self,
164             self.pg0,
165             self.pg0.remote_ip4,
166             desired_min_tx=50000,
167             required_min_rx=10000,
168             detect_mult=1,
169         )
170         session.upd_vpp_config()
171         s = session.get_bfd_udp_session_dump_entry()
172         self.assert_equal(
173             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
174         )
175         self.assert_equal(
176             session.required_min_rx, s.required_min_rx, "required min receive interval"
177         )
178
179         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
180         session.upd_vpp_config(
181             desired_min_tx=session.desired_min_tx * 2,
182             required_min_rx=session.required_min_rx * 2,
183             detect_mult=session.detect_mult * 2,
184         )
185         s = session.get_bfd_udp_session_dump_entry()
186         self.assert_equal(
187             session.desired_min_tx, s.desired_min_tx, "desired min transmit interval"
188         )
189         self.assert_equal(
190             session.required_min_rx, s.required_min_rx, "required min receive interval"
191         )
192         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
193
194     def test_add_sha1_keys(self):
195         """add SHA1 keys"""
196         key_count = 10
197         keys = [self.factory.create_random_key(self) for i in range(0, key_count)]
198         for key in keys:
199             self.assertFalse(key.query_vpp_config())
200         for key in keys:
201             key.add_vpp_config()
202         for key in keys:
203             self.assertTrue(key.query_vpp_config())
204         # remove randomly
205         indexes = list(range(key_count))
206         shuffle(indexes)
207         removed = []
208         for i in indexes:
209             key = keys[i]
210             key.remove_vpp_config()
211             removed.append(i)
212             for j in range(key_count):
213                 key = keys[j]
214                 if j in removed:
215                     self.assertFalse(key.query_vpp_config())
216                 else:
217                     self.assertTrue(key.query_vpp_config())
218         # should be removed now
219         for key in keys:
220             self.assertFalse(key.query_vpp_config())
221         # add back and remove again
222         for key in keys:
223             key.add_vpp_config()
224         for key in keys:
225             self.assertTrue(key.query_vpp_config())
226         for key in keys:
227             key.remove_vpp_config()
228         for key in keys:
229             self.assertFalse(key.query_vpp_config())
230
231     def test_add_bfd_sha1(self):
232         """create a BFD session (SHA1)"""
233         key = self.factory.create_random_key(self)
234         key.add_vpp_config()
235         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
236         session.add_vpp_config()
237         self.logger.debug("Session state is %s", session.state)
238         session.remove_vpp_config()
239         session.add_vpp_config()
240         self.logger.debug("Session state is %s", session.state)
241         session.remove_vpp_config()
242
243     def test_double_add_sha1(self):
244         """create the same BFD session twice (negative case) (SHA1)"""
245         key = self.factory.create_random_key(self)
246         key.add_vpp_config()
247         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
248         session.add_vpp_config()
249         with self.assertRaises(Exception):
250             session.add_vpp_config()
251
252     def test_add_auth_nonexistent_key(self):
253         """create BFD session using non-existent SHA1 (negative case)"""
254         session = VppBFDUDPSession(
255             self,
256             self.pg0,
257             self.pg0.remote_ip4,
258             sha1_key=self.factory.create_random_key(self),
259         )
260         with self.assertRaises(Exception):
261             session.add_vpp_config()
262
263     def test_shared_sha1_key(self):
264         """single SHA1 key shared by multiple BFD sessions"""
265         key = self.factory.create_random_key(self)
266         key.add_vpp_config()
267         sessions = [
268             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key),
269             VppBFDUDPSession(
270                 self, self.pg0, self.pg0.remote_ip6, sha1_key=key, af=AF_INET6
271             ),
272             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4, sha1_key=key),
273             VppBFDUDPSession(
274                 self, self.pg1, self.pg1.remote_ip6, sha1_key=key, af=AF_INET6
275             ),
276         ]
277         for s in sessions:
278             s.add_vpp_config()
279         removed = 0
280         for s in sessions:
281             e = key.get_bfd_auth_keys_dump_entry()
282             self.assert_equal(
283                 e.use_count, len(sessions) - removed, "Use count for shared key"
284             )
285             s.remove_vpp_config()
286             removed += 1
287         e = key.get_bfd_auth_keys_dump_entry()
288         self.assert_equal(
289             e.use_count, len(sessions) - removed, "Use count for shared key"
290         )
291
292     def test_activate_auth(self):
293         """activate SHA1 authentication"""
294         key = self.factory.create_random_key(self)
295         key.add_vpp_config()
296         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
297         session.add_vpp_config()
298         session.activate_auth(key)
299
300     def test_deactivate_auth(self):
301         """deactivate SHA1 authentication"""
302         key = self.factory.create_random_key(self)
303         key.add_vpp_config()
304         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
305         session.add_vpp_config()
306         session.activate_auth(key)
307         session.deactivate_auth()
308
309     def test_change_key(self):
310         """change SHA1 key"""
311         key1 = self.factory.create_random_key(self)
312         key2 = self.factory.create_random_key(self)
313         while key2.conf_key_id == key1.conf_key_id:
314             key2 = self.factory.create_random_key(self)
315         key1.add_vpp_config()
316         key2.add_vpp_config()
317         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4, sha1_key=key1)
318         session.add_vpp_config()
319         session.activate_auth(key2)
320
321     def test_set_del_udp_echo_source(self):
322         """set/del udp echo source"""
323         self.create_loopback_interfaces(1)
324         self.loopback0 = self.lo_interfaces[0]
325         self.loopback0.admin_up()
326         echo_source = self.vapi.bfd_udp_get_echo_source()
327         self.assertFalse(echo_source.is_set)
328         self.assertFalse(echo_source.have_usable_ip4)
329         self.assertFalse(echo_source.have_usable_ip6)
330
331         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
332         echo_source = self.vapi.bfd_udp_get_echo_source()
333         self.assertTrue(echo_source.is_set)
334         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
335         self.assertFalse(echo_source.have_usable_ip4)
336         self.assertFalse(echo_source.have_usable_ip6)
337
338         self.loopback0.config_ip4()
339         echo_ip4 = ipaddress.IPv4Address(
340             int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
341         ).packed
342         echo_source = self.vapi.bfd_udp_get_echo_source()
343         self.assertTrue(echo_source.is_set)
344         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
345         self.assertTrue(echo_source.have_usable_ip4)
346         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
347         self.assertFalse(echo_source.have_usable_ip6)
348
349         self.loopback0.config_ip6()
350         echo_ip6 = ipaddress.IPv6Address(
351             int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
352         ).packed
353
354         echo_source = self.vapi.bfd_udp_get_echo_source()
355         self.assertTrue(echo_source.is_set)
356         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
357         self.assertTrue(echo_source.have_usable_ip4)
358         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
359         self.assertTrue(echo_source.have_usable_ip6)
360         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
361
362         self.vapi.bfd_udp_del_echo_source()
363         echo_source = self.vapi.bfd_udp_get_echo_source()
364         self.assertFalse(echo_source.is_set)
365         self.assertFalse(echo_source.have_usable_ip4)
366         self.assertFalse(echo_source.have_usable_ip6)
367
368
369 class BFDTestSession(object):
370     """BFD session as seen from test framework side"""
371
372     def __init__(
373         self,
374         test,
375         interface,
376         af,
377         detect_mult=3,
378         sha1_key=None,
379         bfd_key_id=None,
380         our_seq_number=None,
381         tunnel_header=None,
382         phy_interface=None,
383     ):
384         self.test = test
385         self.af = af
386         self.sha1_key = sha1_key
387         self.bfd_key_id = bfd_key_id
388         self.interface = interface
389         if phy_interface:
390             self.phy_interface = phy_interface
391         else:
392             self.phy_interface = self.interface
393         self.udp_sport = randint(49152, 65535)
394         if our_seq_number is None:
395             self.our_seq_number = randint(0, 40000000)
396         else:
397             self.our_seq_number = our_seq_number
398         self.vpp_seq_number = None
399         self.my_discriminator = 0
400         self.desired_min_tx = 300000
401         self.required_min_rx = 300000
402         self.required_min_echo_rx = None
403         self.detect_mult = detect_mult
404         self.diag = BFDDiagCode.no_diagnostic
405         self.your_discriminator = None
406         self.state = BFDState.down
407         self.auth_type = BFDAuthType.no_auth
408         self.tunnel_header = tunnel_header
409         self.tx_packets = 0
410         self.rx_packets = 0
411         self.tx_packets_echo = 0
412         self.rx_packets_echo = 0
413
414     def inc_seq_num(self):
415         """increment sequence number, wrapping if needed"""
416         if self.our_seq_number == 0xFFFFFFFF:
417             self.our_seq_number = 0
418         else:
419             self.our_seq_number += 1
420
421     def update(
422         self,
423         my_discriminator=None,
424         your_discriminator=None,
425         desired_min_tx=None,
426         required_min_rx=None,
427         required_min_echo_rx=None,
428         detect_mult=None,
429         diag=None,
430         state=None,
431         auth_type=None,
432     ):
433         """update BFD parameters associated with session"""
434         if my_discriminator is not None:
435             self.my_discriminator = my_discriminator
436         if your_discriminator is not None:
437             self.your_discriminator = your_discriminator
438         if required_min_rx is not None:
439             self.required_min_rx = required_min_rx
440         if required_min_echo_rx is not None:
441             self.required_min_echo_rx = required_min_echo_rx
442         if desired_min_tx is not None:
443             self.desired_min_tx = desired_min_tx
444         if detect_mult is not None:
445             self.detect_mult = detect_mult
446         if diag is not None:
447             self.diag = diag
448         if state is not None:
449             self.state = state
450         if auth_type is not None:
451             self.auth_type = auth_type
452
453     def fill_packet_fields(self, packet):
454         """set packet fields with known values in packet"""
455         bfd = packet[BFD]
456         if self.my_discriminator:
457             self.test.logger.debug(
458                 "BFD: setting packet.my_discriminator=%s", self.my_discriminator
459             )
460             bfd.my_discriminator = self.my_discriminator
461         if self.your_discriminator:
462             self.test.logger.debug(
463                 "BFD: setting packet.your_discriminator=%s", self.your_discriminator
464             )
465             bfd.your_discriminator = self.your_discriminator
466         if self.required_min_rx:
467             self.test.logger.debug(
468                 "BFD: setting packet.required_min_rx_interval=%s", self.required_min_rx
469             )
470             bfd.required_min_rx_interval = self.required_min_rx
471         if self.required_min_echo_rx:
472             self.test.logger.debug(
473                 "BFD: setting packet.required_min_echo_rx=%s", self.required_min_echo_rx
474             )
475             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
476         if self.desired_min_tx:
477             self.test.logger.debug(
478                 "BFD: setting packet.desired_min_tx_interval=%s", self.desired_min_tx
479             )
480             bfd.desired_min_tx_interval = self.desired_min_tx
481         if self.detect_mult:
482             self.test.logger.debug(
483                 "BFD: setting packet.detect_mult=%s", self.detect_mult
484             )
485             bfd.detect_mult = self.detect_mult
486         if self.diag:
487             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
488             bfd.diag = self.diag
489         if self.state:
490             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
491             bfd.state = self.state
492         if self.auth_type:
493             # this is used by a negative test-case
494             self.test.logger.debug("BFD: setting packet.auth_type=%s", self.auth_type)
495             bfd.auth_type = self.auth_type
496
497     def create_packet(self):
498         """create a BFD packet, reflecting the current state of session"""
499         if self.sha1_key:
500             bfd = BFD(flags="A")
501             bfd.auth_type = self.sha1_key.auth_type
502             bfd.auth_len = BFD.sha1_auth_len
503             bfd.auth_key_id = self.bfd_key_id
504             bfd.auth_seq_num = self.our_seq_number
505             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
506         else:
507             bfd = BFD()
508         packet = Ether(
509             src=self.phy_interface.remote_mac, dst=self.phy_interface.local_mac
510         )
511         if self.tunnel_header:
512             packet = packet / self.tunnel_header
513         if self.af == AF_INET6:
514             packet = (
515                 packet
516                 / IPv6(
517                     src=self.interface.remote_ip6,
518                     dst=self.interface.local_ip6,
519                     hlim=255,
520                 )
521                 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
522                 / bfd
523             )
524         else:
525             packet = (
526                 packet
527                 / IP(
528                     src=self.interface.remote_ip4, dst=self.interface.local_ip4, ttl=255
529                 )
530                 / UDP(sport=self.udp_sport, dport=BFD.udp_dport)
531                 / bfd
532             )
533         self.test.logger.debug("BFD: Creating packet")
534         self.fill_packet_fields(packet)
535         if self.sha1_key:
536             hash_material = (
537                 scapy.compat.raw(packet[BFD])[:32]
538                 + self.sha1_key.key
539                 + b"\0" * (20 - len(self.sha1_key.key))
540             )
541             self.test.logger.debug(
542                 "BFD: Calculated SHA1 hash: %s"
543                 % hashlib.sha1(hash_material).hexdigest()
544             )
545             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
546         return packet
547
548     def send_packet(self, packet=None, interface=None):
549         """send packet on interface, creating the packet if needed"""
550         if packet is None:
551             packet = self.create_packet()
552         if interface is None:
553             interface = self.phy_interface
554         self.test.logger.debug(ppp("Sending packet:", packet))
555         interface.add_stream(packet)
556         self.tx_packets += 1
557         self.test.pg_start()
558
559     def verify_sha1_auth(self, packet):
560         """Verify correctness of authentication in BFD layer."""
561         bfd = packet[BFD]
562         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
563         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type, BFDAuthType)
564         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
565         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
566         if self.vpp_seq_number is None:
567             self.vpp_seq_number = bfd.auth_seq_num
568             self.test.logger.debug(
569                 "Received initial sequence number: %s" % self.vpp_seq_number
570             )
571         else:
572             recvd_seq_num = bfd.auth_seq_num
573             self.test.logger.debug(
574                 "Received followup sequence number: %s" % recvd_seq_num
575             )
576             if self.vpp_seq_number < 0xFFFFFFFF:
577                 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
578                     self.test.assert_equal(
579                         recvd_seq_num, self.vpp_seq_number + 1, "BFD sequence number"
580                     )
581                 else:
582                     self.test.assert_in_range(
583                         recvd_seq_num,
584                         self.vpp_seq_number,
585                         self.vpp_seq_number + 1,
586                         "BFD sequence number",
587                     )
588             else:
589                 if self.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1:
590                     self.test.assert_equal(recvd_seq_num, 0, "BFD sequence number")
591                 else:
592                     self.test.assertIn(
593                         recvd_seq_num,
594                         (self.vpp_seq_number, 0),
595                         "BFD sequence number not one of "
596                         "(%s, 0)" % self.vpp_seq_number,
597                     )
598             self.vpp_seq_number = recvd_seq_num
599         # last 20 bytes represent the hash - so replace them with the key,
600         # pad the result with zeros and hash the result
601         hash_material = (
602             bfd.original[:-20]
603             + self.sha1_key.key
604             + b"\0" * (20 - len(self.sha1_key.key))
605         )
606         expected_hash = hashlib.sha1(hash_material).hexdigest()
607         self.test.assert_equal(
608             binascii.hexlify(bfd.auth_key_hash), expected_hash.encode(), "Auth key hash"
609         )
610
611     def verify_bfd(self, packet):
612         """Verify correctness of BFD layer."""
613         bfd = packet[BFD]
614         self.test.assert_equal(bfd.version, 1, "BFD version")
615         self.test.assert_equal(
616             bfd.your_discriminator, self.my_discriminator, "BFD - your discriminator"
617         )
618         if self.sha1_key:
619             self.verify_sha1_auth(packet)
620
621
622 def bfd_session_up(test):
623     """Bring BFD session up"""
624     test.logger.info("BFD: Waiting for slow hello")
625     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
626     old_offset = None
627     if hasattr(test, "vpp_clock_offset"):
628         old_offset = test.vpp_clock_offset
629     test.vpp_clock_offset = time.time() - float(p.time)
630     test.logger.debug("BFD: Calculated vpp clock offset: %s", test.vpp_clock_offset)
631     if old_offset:
632         test.assertAlmostEqual(
633             old_offset,
634             test.vpp_clock_offset,
635             delta=0.5,
636             msg="vpp clock offset not stable (new: %s, old: %s)"
637             % (test.vpp_clock_offset, old_offset),
638         )
639     test.logger.info("BFD: Sending Init")
640     test.test_session.update(
641         my_discriminator=randint(0, 40000000),
642         your_discriminator=p[BFD].my_discriminator,
643         state=BFDState.init,
644     )
645     if (
646         test.test_session.sha1_key
647         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
648     ):
649         test.test_session.inc_seq_num()
650     test.test_session.send_packet()
651     test.logger.info("BFD: Waiting for event")
652     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
653     verify_event(test, e, expected_state=BFDState.up)
654     test.logger.info("BFD: Session is Up")
655     test.test_session.update(state=BFDState.up)
656     if (
657         test.test_session.sha1_key
658         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
659     ):
660         test.test_session.inc_seq_num()
661     test.test_session.send_packet()
662     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
663
664
665 def bfd_session_down(test):
666     """Bring BFD session down"""
667     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
668     test.test_session.update(state=BFDState.down)
669     if (
670         test.test_session.sha1_key
671         and test.test_session.sha1_key.auth_type == BFDAuthType.meticulous_keyed_sha1
672     ):
673         test.test_session.inc_seq_num()
674     test.test_session.send_packet()
675     test.logger.info("BFD: Waiting for event")
676     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
677     verify_event(test, e, expected_state=BFDState.down)
678     test.logger.info("BFD: Session is Down")
679     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
680
681
682 def verify_bfd_session_config(test, session, state=None):
683     dump = session.get_bfd_udp_session_dump_entry()
684     test.assertIsNotNone(dump)
685     # since dump is not none, we have verified that sw_if_index and addresses
686     # are valid (in get_bfd_udp_session_dump_entry)
687     if state:
688         test.assert_equal(dump.state, state, "session state")
689     test.assert_equal(
690         dump.required_min_rx, session.required_min_rx, "required min rx interval"
691     )
692     test.assert_equal(
693         dump.desired_min_tx, session.desired_min_tx, "desired min tx interval"
694     )
695     test.assert_equal(dump.detect_mult, session.detect_mult, "detect multiplier")
696     if session.sha1_key is None:
697         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
698     else:
699         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
700         test.assert_equal(dump.bfd_key_id, session.bfd_key_id, "bfd key id")
701         test.assert_equal(
702             dump.conf_key_id, session.sha1_key.conf_key_id, "config key id"
703         )
704
705
706 def verify_ip(test, packet):
707     """Verify correctness of IP layer."""
708     if test.vpp_session.af == AF_INET6:
709         ip = packet[IPv6]
710         local_ip = test.vpp_session.interface.local_ip6
711         remote_ip = test.vpp_session.interface.remote_ip6
712         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
713     else:
714         ip = packet[IP]
715         local_ip = test.vpp_session.interface.local_ip4
716         remote_ip = test.vpp_session.interface.remote_ip4
717         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
718     test.assert_equal(ip.src, local_ip, "IP source address")
719     test.assert_equal(ip.dst, remote_ip, "IP destination address")
720
721
722 def verify_udp(test, packet):
723     """Verify correctness of UDP layer."""
724     udp = packet[UDP]
725     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
726     test.assert_in_range(
727         udp.sport, BFD.udp_sport_min, BFD.udp_sport_max, "UDP source port"
728     )
729
730
731 def verify_event(test, event, expected_state):
732     """Verify correctness of event values."""
733     e = event
734     test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
735     test.assert_equal(
736         e.sw_if_index, test.vpp_session.interface.sw_if_index, "BFD interface index"
737     )
738
739     test.assert_equal(
740         str(e.local_addr), test.vpp_session.local_addr, "Local IPv6 address"
741     )
742     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr, "Peer IPv6 address")
743     test.assert_equal(e.state, expected_state, BFDState)
744
745
746 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
747     """wait for BFD packet and verify its correctness
748
749     :param timeout: how long to wait
750     :param pcap_time_min: ignore packets with pcap timestamp lower than this
751
752     :returns: tuple (packet, time spent waiting for packet)
753     """
754     test.logger.info("BFD: Waiting for BFD packet")
755     deadline = time.time() + timeout
756     counter = 0
757     while True:
758         counter += 1
759         # sanity check
760         test.assert_in_range(counter, 0, 100, "number of packets ignored")
761         time_left = deadline - time.time()
762         if time_left < 0:
763             raise CaptureTimeoutError("Packet did not arrive within timeout")
764         p = test.pg0.wait_for_packet(timeout=time_left)
765         test.test_session.rx_packets += 1
766         test.logger.debug(ppp("BFD: Got packet:", p))
767         if pcap_time_min is not None and p.time < pcap_time_min:
768             test.logger.debug(
769                 ppp(
770                     "BFD: ignoring packet (pcap time %s < "
771                     "pcap time min %s):" % (p.time, pcap_time_min),
772                     p,
773                 )
774             )
775         else:
776             break
777     test.logger.debug(test.vapi.ppcli("show trace"))
778     if is_tunnel:
779         # strip an IP layer and move to the next
780         p = p[IP].payload
781
782     bfd = p[BFD]
783     if bfd is None:
784         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
785     if bfd.payload:
786         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
787     verify_ip(test, p)
788     verify_udp(test, p)
789     test.test_session.verify_bfd(p)
790     return p
791
792
793 BFDStats = namedtuple("BFDStats", "rx rx_echo tx tx_echo")
794
795
796 def bfd_grab_stats_snapshot(test, bs_idx=0, thread_index=None):
797     s = test.statistics
798     ti = thread_index
799     if ti is None:
800         rx = s["/bfd/rx-session-counters"][:, bs_idx].sum_packets()
801         rx_echo = s["/bfd/rx-session-echo-counters"][:, bs_idx].sum_packets()
802         tx = s["/bfd/tx-session-counters"][:, bs_idx].sum_packets()
803         tx_echo = s["/bfd/tx-session-echo-counters"][:, bs_idx].sum_packets()
804     else:
805         rx = s["/bfd/rx-session-counters"][ti, bs_idx].sum_packets()
806         rx_echo = s["/bfd/rx-session-echo-counters"][ti, bs_idx].sum_packets()
807         tx = s["/bfd/tx-session-counters"][ti, bs_idx].sum_packets()
808         tx_echo = s["/bfd/tx-session-echo-counters"][ti, bs_idx].sum_packets()
809     return BFDStats(rx, rx_echo, tx, tx_echo)
810
811
812 def bfd_stats_diff(stats_before, stats_after):
813     rx = stats_after.rx - stats_before.rx
814     rx_echo = stats_after.rx_echo - stats_before.rx_echo
815     tx = stats_after.tx - stats_before.tx
816     tx_echo = stats_after.tx_echo - stats_before.tx_echo
817     return BFDStats(rx, rx_echo, tx, tx_echo)
818
819
820 @tag_run_solo
821 class BFD4TestCase(VppTestCase):
822     """Bidirectional Forwarding Detection (BFD)"""
823
824     pg0 = None
825     vpp_clock_offset = None
826     vpp_session = None
827     test_session = None
828
829     @classmethod
830     def setUpClass(cls):
831         super(BFD4TestCase, cls).setUpClass()
832         cls.vapi.cli("set log class bfd level debug")
833         try:
834             cls.create_pg_interfaces([0])
835             cls.create_loopback_interfaces(1)
836             cls.loopback0 = cls.lo_interfaces[0]
837             cls.loopback0.config_ip4()
838             cls.loopback0.admin_up()
839             cls.pg0.config_ip4()
840             cls.pg0.configure_ipv4_neighbors()
841             cls.pg0.admin_up()
842             cls.pg0.resolve_arp()
843
844         except Exception:
845             super(BFD4TestCase, cls).tearDownClass()
846             raise
847
848     @classmethod
849     def tearDownClass(cls):
850         super(BFD4TestCase, cls).tearDownClass()
851
852     def setUp(self):
853         super(BFD4TestCase, self).setUp()
854         self.factory = AuthKeyFactory()
855         self.vapi.want_bfd_events()
856         self.pg0.enable_capture()
857         try:
858             self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
859             self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
860             self.vapi.cli("trace add bfd-process 500")
861             self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
862             self.vpp_session.add_vpp_config()
863             self.vpp_session.admin_up()
864             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
865         except BaseException:
866             self.vapi.want_bfd_events(enable_disable=0)
867             raise
868
869     def tearDown(self):
870         if not self.vpp_dead:
871             self.vapi.want_bfd_events(enable_disable=0)
872         self.vapi.collect_events()  # clear the event queue
873         super(BFD4TestCase, self).tearDown()
874
875     def test_session_up(self):
876         """bring BFD session up"""
877         bfd_session_up(self)
878         bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
879         bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
880         self.assert_equal(bfd_udp4_sessions - self.bfd_udp4_sessions, 1)
881         self.assert_equal(bfd_udp6_sessions, self.bfd_udp6_sessions)
882
883     def test_session_up_by_ip(self):
884         """bring BFD session up - first frame looked up by address pair"""
885         self.logger.info("BFD: Sending Slow control frame")
886         self.test_session.update(my_discriminator=randint(0, 40000000))
887         self.test_session.send_packet()
888         self.pg0.enable_capture()
889         p = self.pg0.wait_for_packet(1)
890         self.assert_equal(
891             p[BFD].your_discriminator,
892             self.test_session.my_discriminator,
893             "BFD - your discriminator",
894         )
895         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
896         self.test_session.update(
897             your_discriminator=p[BFD].my_discriminator, state=BFDState.up
898         )
899         self.logger.info("BFD: Waiting for event")
900         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
901         verify_event(self, e, expected_state=BFDState.init)
902         self.logger.info("BFD: Sending Up")
903         self.test_session.send_packet()
904         self.logger.info("BFD: Waiting for event")
905         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
906         verify_event(self, e, expected_state=BFDState.up)
907         self.logger.info("BFD: Session is Up")
908         self.test_session.update(state=BFDState.up)
909         self.test_session.send_packet()
910         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
911
912     def test_session_down(self):
913         """bring BFD session down"""
914         bfd_session_up(self)
915         bfd_session_down(self)
916
917     def test_hold_up(self):
918         """hold BFD session up"""
919         bfd_session_up(self)
920         for dummy in range(self.test_session.detect_mult * 2):
921             wait_for_bfd_packet(self)
922             self.test_session.send_packet()
923         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
924
925     def test_slow_timer(self):
926         """verify slow periodic control frames while session down"""
927         packet_count = 3
928         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
929         prev_packet = wait_for_bfd_packet(self, 2)
930         for dummy in range(packet_count):
931             next_packet = wait_for_bfd_packet(self, 2)
932             time_diff = next_packet.time - prev_packet.time
933             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
934             # to work around timing issues
935             self.assert_in_range(time_diff, 0.70, 1.05, "time between slow packets")
936             prev_packet = next_packet
937
938     def test_zero_remote_min_rx(self):
939         """no packets when zero remote required min rx interval"""
940         bfd_session_up(self)
941         self.test_session.update(required_min_rx=0)
942         self.test_session.send_packet()
943         for dummy in range(self.test_session.detect_mult):
944             self.sleep(
945                 self.vpp_session.required_min_rx / USEC_IN_SEC,
946                 "sleep before transmitting bfd packet",
947             )
948             self.test_session.send_packet()
949             try:
950                 p = wait_for_bfd_packet(self, timeout=0)
951                 self.logger.error(ppp("Received unexpected packet:", p))
952             except CaptureTimeoutError:
953                 pass
954         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
955         self.test_session.update(required_min_rx=300000)
956         for dummy in range(3):
957             self.test_session.send_packet()
958             wait_for_bfd_packet(
959                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC
960             )
961         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
962
963     def test_conn_down(self):
964         """verify session goes down after inactivity"""
965         bfd_session_up(self)
966         detection_time = (
967             self.test_session.detect_mult
968             * self.vpp_session.required_min_rx
969             / USEC_IN_SEC
970         )
971         self.sleep(detection_time, "waiting for BFD session time-out")
972         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
973         verify_event(self, e, expected_state=BFDState.down)
974
975     def test_peer_discr_reset_sess_down(self):
976         """peer discriminator reset after session goes down"""
977         bfd_session_up(self)
978         detection_time = (
979             self.test_session.detect_mult
980             * self.vpp_session.required_min_rx
981             / USEC_IN_SEC
982         )
983         self.sleep(detection_time, "waiting for BFD session time-out")
984         self.test_session.my_discriminator = 0
985         wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
986
987     def test_large_required_min_rx(self):
988         """large remote required min rx interval"""
989         bfd_session_up(self)
990         p = wait_for_bfd_packet(self)
991         interval = 3000000
992         self.test_session.update(required_min_rx=interval)
993         self.test_session.send_packet()
994         time_mark = time.time()
995         count = 0
996         # busy wait here, trying to collect a packet or event, vpp is not
997         # allowed to send packets and the session will timeout first - so the
998         # Up->Down event must arrive before any packets do
999         while time.time() < time_mark + interval / USEC_IN_SEC:
1000             try:
1001                 p = wait_for_bfd_packet(self, timeout=0)
1002                 # if vpp managed to send a packet before we did the session
1003                 # session update, then that's fine, ignore it
1004                 if p.time < time_mark - self.vpp_clock_offset:
1005                     continue
1006                 self.logger.error(ppp("Received unexpected packet:", p))
1007                 count += 1
1008             except CaptureTimeoutError:
1009                 pass
1010             events = self.vapi.collect_events()
1011             if len(events) > 0:
1012                 verify_event(self, events[0], BFDState.down)
1013                 break
1014         self.assert_equal(count, 0, "number of packets received")
1015
1016     def test_immediate_remote_min_rx_reduction(self):
1017         """immediately honor remote required min rx reduction"""
1018         self.vpp_session.remove_vpp_config()
1019         self.vpp_session = VppBFDUDPSession(
1020             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000
1021         )
1022         self.pg0.enable_capture()
1023         self.vpp_session.add_vpp_config()
1024         self.test_session.update(desired_min_tx=1000000, required_min_rx=1000000)
1025         bfd_session_up(self)
1026         reference_packet = wait_for_bfd_packet(self)
1027         time_mark = time.time()
1028         interval = 300000
1029         self.test_session.update(required_min_rx=interval)
1030         self.test_session.send_packet()
1031         extra_time = time.time() - time_mark
1032         p = wait_for_bfd_packet(self)
1033         # first packet is allowed to be late by time we spent doing the update
1034         # calculated in extra_time
1035         self.assert_in_range(
1036             p.time - reference_packet.time,
1037             0.95 * 0.75 * interval / USEC_IN_SEC,
1038             1.05 * interval / USEC_IN_SEC + extra_time,
1039             "time between BFD packets",
1040         )
1041         reference_packet = p
1042         for dummy in range(3):
1043             p = wait_for_bfd_packet(self)
1044             diff = p.time - reference_packet.time
1045             self.assert_in_range(
1046                 diff,
1047                 0.95 * 0.75 * interval / USEC_IN_SEC,
1048                 1.05 * interval / USEC_IN_SEC,
1049                 "time between BFD packets",
1050             )
1051             reference_packet = p
1052
1053     def test_modify_req_min_rx_double(self):
1054         """modify session - double required min rx"""
1055         bfd_session_up(self)
1056         p = wait_for_bfd_packet(self)
1057         self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1058         self.test_session.send_packet()
1059         # double required min rx
1060         self.vpp_session.modify_parameters(
1061             required_min_rx=2 * self.vpp_session.required_min_rx
1062         )
1063         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1064         # poll bit needs to be set
1065         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1066         # finish poll sequence with final packet
1067         final = self.test_session.create_packet()
1068         final[BFD].flags = "F"
1069         timeout = (
1070             self.test_session.detect_mult
1071             * max(self.test_session.desired_min_tx, self.vpp_session.required_min_rx)
1072             / USEC_IN_SEC
1073         )
1074         self.test_session.send_packet(final)
1075         time_mark = time.time()
1076         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
1077         verify_event(self, e, expected_state=BFDState.down)
1078         time_to_event = time.time() - time_mark
1079         self.assert_in_range(
1080             time_to_event, 0.9 * timeout, 1.1 * timeout, "session timeout"
1081         )
1082
1083     def test_modify_req_min_rx_halve(self):
1084         """modify session - halve required min rx"""
1085         self.vpp_session.modify_parameters(
1086             required_min_rx=2 * self.vpp_session.required_min_rx
1087         )
1088         bfd_session_up(self)
1089         p = wait_for_bfd_packet(self)
1090         self.test_session.update(desired_min_tx=10000, required_min_rx=10000)
1091         self.test_session.send_packet()
1092         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1093         # halve required min rx
1094         old_required_min_rx = self.vpp_session.required_min_rx
1095         self.vpp_session.modify_parameters(
1096             required_min_rx=self.vpp_session.required_min_rx // 2
1097         )
1098         # now we wait 0.8*3*old-req-min-rx and the session should still be up
1099         self.sleep(
1100             0.8 * self.vpp_session.detect_mult * old_required_min_rx / USEC_IN_SEC,
1101             "wait before finishing poll sequence",
1102         )
1103         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1104         p = wait_for_bfd_packet(self)
1105         # poll bit needs to be set
1106         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1107         # finish poll sequence with final packet
1108         final = self.test_session.create_packet()
1109         final[BFD].flags = "F"
1110         self.test_session.send_packet(final)
1111         # now the session should time out under new conditions
1112         detection_time = (
1113             self.test_session.detect_mult
1114             * self.vpp_session.required_min_rx
1115             / USEC_IN_SEC
1116         )
1117         before = time.time()
1118         e = self.vapi.wait_for_event(2 * detection_time, "bfd_udp_session_event")
1119         after = time.time()
1120         self.assert_in_range(
1121             after - before,
1122             0.9 * detection_time,
1123             1.1 * detection_time,
1124             "time before bfd session goes down",
1125         )
1126         verify_event(self, e, expected_state=BFDState.down)
1127
1128     def test_modify_detect_mult(self):
1129         """modify detect multiplier"""
1130         bfd_session_up(self)
1131         p = wait_for_bfd_packet(self)
1132         self.vpp_session.modify_parameters(detect_mult=1)
1133         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1134         self.assert_equal(
1135             self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1136         )
1137         # poll bit must not be set
1138         self.assertNotIn(
1139             "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1140         )
1141         self.vpp_session.modify_parameters(detect_mult=10)
1142         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1143         self.assert_equal(
1144             self.vpp_session.detect_mult, p[BFD].detect_mult, "detect mult"
1145         )
1146         # poll bit must not be set
1147         self.assertNotIn(
1148             "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1149         )
1150
1151     def test_queued_poll(self):
1152         """test poll sequence queueing"""
1153         bfd_session_up(self)
1154         p = wait_for_bfd_packet(self)
1155         self.vpp_session.modify_parameters(
1156             required_min_rx=2 * self.vpp_session.required_min_rx
1157         )
1158         p = wait_for_bfd_packet(self)
1159         poll_sequence_start = time.time()
1160         poll_sequence_length_min = 0.5
1161         send_final_after = time.time() + poll_sequence_length_min
1162         # poll bit needs to be set
1163         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet")
1164         self.assert_equal(
1165             p[BFD].required_min_rx_interval,
1166             self.vpp_session.required_min_rx,
1167             "BFD required min rx interval",
1168         )
1169         self.vpp_session.modify_parameters(
1170             required_min_rx=2 * self.vpp_session.required_min_rx
1171         )
1172         # 2nd poll sequence should be queued now
1173         # don't send the reply back yet, wait for some time to emulate
1174         # longer round-trip time
1175         packet_count = 0
1176         while time.time() < send_final_after:
1177             self.test_session.send_packet()
1178             p = wait_for_bfd_packet(self)
1179             self.assert_equal(
1180                 len(self.vapi.collect_events()), 0, "number of bfd events"
1181             )
1182             self.assert_equal(
1183                 p[BFD].required_min_rx_interval,
1184                 self.vpp_session.required_min_rx,
1185                 "BFD required min rx interval",
1186             )
1187             packet_count += 1
1188             # poll bit must be set
1189             self.assertIn(
1190                 "P", p.sprintf("%BFD.flags%"), "Poll bit not set in BFD packet"
1191             )
1192         final = self.test_session.create_packet()
1193         final[BFD].flags = "F"
1194         self.test_session.send_packet(final)
1195         # finish 1st with final
1196         poll_sequence_length = time.time() - poll_sequence_start
1197         # vpp must wait for some time before starting new poll sequence
1198         poll_no_2_started = False
1199         for dummy in range(2 * packet_count):
1200             p = wait_for_bfd_packet(self)
1201             self.assert_equal(
1202                 len(self.vapi.collect_events()), 0, "number of bfd events"
1203             )
1204             if "P" in p.sprintf("%BFD.flags%"):
1205                 poll_no_2_started = True
1206                 if time.time() < poll_sequence_start + poll_sequence_length:
1207                     raise Exception("VPP started 2nd poll sequence too soon")
1208                 final = self.test_session.create_packet()
1209                 final[BFD].flags = "F"
1210                 self.test_session.send_packet(final)
1211                 break
1212             else:
1213                 self.test_session.send_packet()
1214         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1215         # finish 2nd with final
1216         final = self.test_session.create_packet()
1217         final[BFD].flags = "F"
1218         self.test_session.send_packet(final)
1219         p = wait_for_bfd_packet(self)
1220         # poll bit must not be set
1221         self.assertNotIn("P", p.sprintf("%BFD.flags%"), "Poll bit set in BFD packet")
1222
1223     # returning inconsistent results requiring retries in per-patch tests
1224     @unittest.skipUnless(config.extended, "part of extended tests")
1225     def test_poll_response(self):
1226         """test correct response to control frame with poll bit set"""
1227         bfd_session_up(self)
1228         poll = self.test_session.create_packet()
1229         poll[BFD].flags = "P"
1230         self.test_session.send_packet(poll)
1231         final = wait_for_bfd_packet(
1232             self, pcap_time_min=time.time() - self.vpp_clock_offset
1233         )
1234         self.assertIn("F", final.sprintf("%BFD.flags%"))
1235
1236     def test_no_periodic_if_remote_demand(self):
1237         """no periodic frames outside poll sequence if remote demand set"""
1238         bfd_session_up(self)
1239         demand = self.test_session.create_packet()
1240         demand[BFD].flags = "D"
1241         self.test_session.send_packet(demand)
1242         transmit_time = (
1243             0.9
1244             * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1245             / USEC_IN_SEC
1246         )
1247         count = 0
1248         for dummy in range(self.test_session.detect_mult * 2):
1249             self.sleep(transmit_time)
1250             self.test_session.send_packet(demand)
1251             try:
1252                 p = wait_for_bfd_packet(self, timeout=0)
1253                 self.logger.error(ppp("Received unexpected packet:", p))
1254                 count += 1
1255             except CaptureTimeoutError:
1256                 pass
1257         events = self.vapi.collect_events()
1258         for e in events:
1259             self.logger.error("Received unexpected event: %s", e)
1260         self.assert_equal(count, 0, "number of packets received")
1261         self.assert_equal(len(events), 0, "number of events received")
1262
1263     def test_echo_looped_back(self):
1264         """echo packets looped back"""
1265         bfd_session_up(self)
1266         stats_before = bfd_grab_stats_snapshot(self)
1267         self.pg0.enable_capture()
1268         echo_packet_count = 10
1269         # random source port low enough to increment a few times..
1270         udp_sport_tx = randint(1, 50000)
1271         udp_sport_rx = udp_sport_tx
1272         echo_packet = (
1273             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1274             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
1275             / UDP(dport=BFD.udp_dport_echo)
1276             / Raw("this should be looped back")
1277         )
1278         for dummy in range(echo_packet_count):
1279             self.sleep(0.01, "delay between echo packets")
1280             echo_packet[UDP].sport = udp_sport_tx
1281             udp_sport_tx += 1
1282             self.logger.debug(ppp("Sending packet:", echo_packet))
1283             self.pg0.add_stream(echo_packet)
1284             self.pg_start()
1285             self.logger.debug(self.vapi.ppcli("show trace"))
1286         counter = 0
1287         bfd_control_packets_rx = 0
1288         while counter < echo_packet_count:
1289             p = self.pg0.wait_for_packet(1)
1290             self.logger.debug(ppp("Got packet:", p))
1291             ether = p[Ether]
1292             self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1293             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1294             ip = p[IP]
1295             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1296             udp = p[UDP]
1297             if udp.dport == BFD.udp_dport:
1298                 bfd_control_packets_rx += 1
1299                 continue
1300             self.assert_equal(self.pg0.remote_ip4, ip.src, "Source IP")
1301             self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1302             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1303             udp_sport_rx += 1
1304             # need to compare the hex payload here, otherwise BFD_vpp_echo
1305             # gets in way
1306             self.assertEqual(
1307                 scapy.compat.raw(p[UDP].payload),
1308                 scapy.compat.raw(echo_packet[UDP].payload),
1309                 "Received packet is not the echo packet sent",
1310             )
1311             counter += 1
1312         self.assert_equal(
1313             udp_sport_tx,
1314             udp_sport_rx,
1315             "UDP source port (== ECHO packet identifier for test purposes)",
1316         )
1317         stats_after = bfd_grab_stats_snapshot(self)
1318         diff = bfd_stats_diff(stats_before, stats_after)
1319         self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1320         self.assertEqual(bfd_control_packets_rx, diff.tx, "TX counter incorrect")
1321         self.assertEqual(
1322             0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1323         )
1324         self.assertEqual(
1325             0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1326         )
1327
1328     def test_echo(self):
1329         """echo function"""
1330         stats_before = bfd_grab_stats_snapshot(self)
1331         bfd_session_up(self)
1332         self.test_session.update(required_min_echo_rx=150000)
1333         self.test_session.send_packet()
1334         detection_time = (
1335             self.test_session.detect_mult
1336             * self.vpp_session.required_min_rx
1337             / USEC_IN_SEC
1338         )
1339         # echo shouldn't work without echo source set
1340         for dummy in range(10):
1341             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1342             self.sleep(sleep, "delay before sending bfd packet")
1343             self.test_session.send_packet()
1344         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1345         self.assert_equal(
1346             p[BFD].required_min_rx_interval,
1347             self.vpp_session.required_min_rx,
1348             "BFD required min rx interval",
1349         )
1350         self.test_session.send_packet()
1351         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1352         echo_seen = False
1353         # should be turned on - loopback echo packets
1354         for dummy in range(3):
1355             loop_until = time.time() + 0.75 * detection_time
1356             while time.time() < loop_until:
1357                 p = self.pg0.wait_for_packet(1)
1358                 self.logger.debug(ppp("Got packet:", p))
1359                 if p[UDP].dport == BFD.udp_dport_echo:
1360                     self.assert_equal(p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1361                     self.assertNotEqual(
1362                         p[IP].src,
1363                         self.loopback0.local_ip4,
1364                         "BFD ECHO src IP equal to loopback IP",
1365                     )
1366                     self.logger.debug(ppp("Looping back packet:", p))
1367                     self.assert_equal(
1368                         p[Ether].dst,
1369                         self.pg0.remote_mac,
1370                         "ECHO packet destination MAC address",
1371                     )
1372                     p[Ether].dst = self.pg0.local_mac
1373                     self.pg0.add_stream(p)
1374                     self.test_session.rx_packets_echo += 1
1375                     self.test_session.tx_packets_echo += 1
1376                     self.pg_start()
1377                     echo_seen = True
1378                 elif p.haslayer(BFD):
1379                     self.test_session.rx_packets += 1
1380                     if echo_seen:
1381                         self.assertGreaterEqual(
1382                             p[BFD].required_min_rx_interval, 1000000
1383                         )
1384                     if "P" in p.sprintf("%BFD.flags%"):
1385                         final = self.test_session.create_packet()
1386                         final[BFD].flags = "F"
1387                         self.test_session.send_packet(final)
1388                 else:
1389                     raise Exception(ppp("Received unknown packet:", p))
1390
1391                 self.assert_equal(
1392                     len(self.vapi.collect_events()), 0, "number of bfd events"
1393                 )
1394             self.test_session.send_packet()
1395         self.assertTrue(echo_seen, "No echo packets received")
1396
1397         stats_after = bfd_grab_stats_snapshot(self)
1398         diff = bfd_stats_diff(stats_before, stats_after)
1399         # our rx is vpp tx and vice versa, also tolerate one packet off
1400         self.assert_in_range(
1401             self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1402         )
1403         self.assert_in_range(
1404             self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1405         )
1406         self.assert_in_range(
1407             self.test_session.tx_packets_echo,
1408             diff.rx_echo - 1,
1409             diff.rx_echo + 1,
1410             "RX echo counter",
1411         )
1412         self.assert_in_range(
1413             self.test_session.rx_packets_echo,
1414             diff.tx_echo - 1,
1415             diff.tx_echo + 1,
1416             "TX echo counter",
1417         )
1418
1419     def test_echo_fail(self):
1420         """session goes down if echo function fails"""
1421         bfd_session_up(self)
1422         self.test_session.update(required_min_echo_rx=150000)
1423         self.test_session.send_packet()
1424         detection_time = (
1425             self.test_session.detect_mult
1426             * self.vpp_session.required_min_rx
1427             / USEC_IN_SEC
1428         )
1429         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1430         # echo function should be used now, but we will drop the echo packets
1431         verified_diag = False
1432         for dummy in range(3):
1433             loop_until = time.time() + 0.75 * detection_time
1434             while time.time() < loop_until:
1435                 p = self.pg0.wait_for_packet(1)
1436                 self.logger.debug(ppp("Got packet:", p))
1437                 if p[UDP].dport == BFD.udp_dport_echo:
1438                     # dropped
1439                     pass
1440                 elif p.haslayer(BFD):
1441                     if "P" in p.sprintf("%BFD.flags%"):
1442                         self.assertGreaterEqual(
1443                             p[BFD].required_min_rx_interval, 1000000
1444                         )
1445                         final = self.test_session.create_packet()
1446                         final[BFD].flags = "F"
1447                         self.test_session.send_packet(final)
1448                     if p[BFD].state == BFDState.down:
1449                         self.assert_equal(
1450                             p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1451                         )
1452                         verified_diag = True
1453                 else:
1454                     raise Exception(ppp("Received unknown packet:", p))
1455             self.test_session.send_packet()
1456         events = self.vapi.collect_events()
1457         self.assert_equal(len(events), 1, "number of bfd events")
1458         self.assert_equal(events[0].state, BFDState.down, BFDState)
1459         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1460
1461     def test_echo_stop(self):
1462         """echo function stops if peer sets required min echo rx zero"""
1463         bfd_session_up(self)
1464         self.test_session.update(required_min_echo_rx=150000)
1465         self.test_session.send_packet()
1466         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1467         # wait for first echo packet
1468         while True:
1469             p = self.pg0.wait_for_packet(1)
1470             self.logger.debug(ppp("Got packet:", p))
1471             if p[UDP].dport == BFD.udp_dport_echo:
1472                 self.logger.debug(ppp("Looping back packet:", p))
1473                 p[Ether].dst = self.pg0.local_mac
1474                 self.pg0.add_stream(p)
1475                 self.pg_start()
1476                 break
1477             elif p.haslayer(BFD):
1478                 # ignore BFD
1479                 pass
1480             else:
1481                 raise Exception(ppp("Received unknown packet:", p))
1482         self.test_session.update(required_min_echo_rx=0)
1483         self.test_session.send_packet()
1484         # echo packets shouldn't arrive anymore
1485         for dummy in range(5):
1486             wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1487             self.test_session.send_packet()
1488             events = self.vapi.collect_events()
1489             self.assert_equal(len(events), 0, "number of bfd events")
1490
1491     def test_echo_source_removed(self):
1492         """echo function stops if echo source is removed"""
1493         bfd_session_up(self)
1494         self.test_session.update(required_min_echo_rx=150000)
1495         self.test_session.send_packet()
1496         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1497         # wait for first echo packet
1498         while True:
1499             p = self.pg0.wait_for_packet(1)
1500             self.logger.debug(ppp("Got packet:", p))
1501             if p[UDP].dport == BFD.udp_dport_echo:
1502                 self.logger.debug(ppp("Looping back packet:", p))
1503                 p[Ether].dst = self.pg0.local_mac
1504                 self.pg0.add_stream(p)
1505                 self.pg_start()
1506                 break
1507             elif p.haslayer(BFD):
1508                 # ignore BFD
1509                 pass
1510             else:
1511                 raise Exception(ppp("Received unknown packet:", p))
1512         self.vapi.bfd_udp_del_echo_source()
1513         self.test_session.send_packet()
1514         # echo packets shouldn't arrive anymore
1515         for dummy in range(5):
1516             wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1517             self.test_session.send_packet()
1518             events = self.vapi.collect_events()
1519             self.assert_equal(len(events), 0, "number of bfd events")
1520
1521     def test_stale_echo(self):
1522         """stale echo packets don't keep a session up"""
1523         bfd_session_up(self)
1524         self.test_session.update(required_min_echo_rx=150000)
1525         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1526         self.test_session.send_packet()
1527         # should be turned on - loopback echo packets
1528         echo_packet = None
1529         timeout_at = None
1530         timeout_ok = False
1531         for dummy in range(10 * self.vpp_session.detect_mult):
1532             p = self.pg0.wait_for_packet(1)
1533             if p[UDP].dport == BFD.udp_dport_echo:
1534                 if echo_packet is None:
1535                     self.logger.debug(ppp("Got first echo packet:", p))
1536                     echo_packet = p
1537                     timeout_at = (
1538                         time.time()
1539                         + self.vpp_session.detect_mult
1540                         * self.test_session.required_min_echo_rx
1541                         / USEC_IN_SEC
1542                     )
1543                 else:
1544                     self.logger.debug(ppp("Got followup echo packet:", p))
1545                 self.logger.debug(ppp("Looping back first echo packet:", p))
1546                 echo_packet[Ether].dst = self.pg0.local_mac
1547                 self.pg0.add_stream(echo_packet)
1548                 self.pg_start()
1549             elif p.haslayer(BFD):
1550                 self.logger.debug(ppp("Got packet:", p))
1551                 if "P" in p.sprintf("%BFD.flags%"):
1552                     final = self.test_session.create_packet()
1553                     final[BFD].flags = "F"
1554                     self.test_session.send_packet(final)
1555                 if p[BFD].state == BFDState.down:
1556                     self.assertIsNotNone(
1557                         timeout_at,
1558                         "Session went down before first echo packet received",
1559                     )
1560                     now = time.time()
1561                     self.assertGreaterEqual(
1562                         now,
1563                         timeout_at,
1564                         "Session timeout at %s, but is expected at %s"
1565                         % (now, timeout_at),
1566                     )
1567                     self.assert_equal(
1568                         p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1569                     )
1570                     events = self.vapi.collect_events()
1571                     self.assert_equal(len(events), 1, "number of bfd events")
1572                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1573                     timeout_ok = True
1574                     break
1575             else:
1576                 raise Exception(ppp("Received unknown packet:", p))
1577             self.test_session.send_packet()
1578         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1579
1580     def test_invalid_echo_checksum(self):
1581         """echo packets with invalid checksum don't keep a session up"""
1582         bfd_session_up(self)
1583         self.test_session.update(required_min_echo_rx=150000)
1584         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1585         self.test_session.send_packet()
1586         # should be turned on - loopback echo packets
1587         timeout_at = None
1588         timeout_ok = False
1589         for dummy in range(10 * self.vpp_session.detect_mult):
1590             p = self.pg0.wait_for_packet(1)
1591             if p[UDP].dport == BFD.udp_dport_echo:
1592                 self.logger.debug(ppp("Got echo packet:", p))
1593                 if timeout_at is None:
1594                     timeout_at = (
1595                         time.time()
1596                         + self.vpp_session.detect_mult
1597                         * self.test_session.required_min_echo_rx
1598                         / USEC_IN_SEC
1599                     )
1600                 p[BFD_vpp_echo].checksum = getrandbits(64)
1601                 p[Ether].dst = self.pg0.local_mac
1602                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1603                 self.pg0.add_stream(p)
1604                 self.pg_start()
1605             elif p.haslayer(BFD):
1606                 self.logger.debug(ppp("Got packet:", p))
1607                 if "P" in p.sprintf("%BFD.flags%"):
1608                     final = self.test_session.create_packet()
1609                     final[BFD].flags = "F"
1610                     self.test_session.send_packet(final)
1611                 if p[BFD].state == BFDState.down:
1612                     self.assertIsNotNone(
1613                         timeout_at,
1614                         "Session went down before first echo packet received",
1615                     )
1616                     now = time.time()
1617                     self.assertGreaterEqual(
1618                         now,
1619                         timeout_at,
1620                         "Session timeout at %s, but is expected at %s"
1621                         % (now, timeout_at),
1622                     )
1623                     self.assert_equal(
1624                         p[BFD].diag, BFDDiagCode.echo_function_failed, BFDDiagCode
1625                     )
1626                     events = self.vapi.collect_events()
1627                     self.assert_equal(len(events), 1, "number of bfd events")
1628                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1629                     timeout_ok = True
1630                     break
1631             else:
1632                 raise Exception(ppp("Received unknown packet:", p))
1633             self.test_session.send_packet()
1634         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1635
1636     def test_admin_up_down(self):
1637         """put session admin-up and admin-down"""
1638         bfd_session_up(self)
1639         self.vpp_session.admin_down()
1640         self.pg0.enable_capture()
1641         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1642         verify_event(self, e, expected_state=BFDState.admin_down)
1643         for dummy in range(2):
1644             p = wait_for_bfd_packet(self)
1645             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1646         # try to bring session up - shouldn't be possible
1647         self.test_session.update(state=BFDState.init)
1648         self.test_session.send_packet()
1649         for dummy in range(2):
1650             p = wait_for_bfd_packet(self)
1651             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1652         self.vpp_session.admin_up()
1653         self.test_session.update(state=BFDState.down)
1654         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1655         verify_event(self, e, expected_state=BFDState.down)
1656         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1657         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1658         self.test_session.send_packet()
1659         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1660         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1661         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1662         verify_event(self, e, expected_state=BFDState.init)
1663         self.test_session.update(state=BFDState.up)
1664         self.test_session.send_packet()
1665         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1666         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1667         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1668         verify_event(self, e, expected_state=BFDState.up)
1669
1670     def test_config_change_remote_demand(self):
1671         """configuration change while peer in demand mode"""
1672         bfd_session_up(self)
1673         demand = self.test_session.create_packet()
1674         demand[BFD].flags = "D"
1675         self.test_session.send_packet(demand)
1676         self.vpp_session.modify_parameters(
1677             required_min_rx=2 * self.vpp_session.required_min_rx
1678         )
1679         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1680         # poll bit must be set
1681         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1682         # terminate poll sequence
1683         final = self.test_session.create_packet()
1684         final[BFD].flags = "D+F"
1685         self.test_session.send_packet(final)
1686         # vpp should be quiet now again
1687         transmit_time = (
1688             0.9
1689             * max(self.vpp_session.required_min_rx, self.test_session.desired_min_tx)
1690             / USEC_IN_SEC
1691         )
1692         count = 0
1693         for dummy in range(self.test_session.detect_mult * 2):
1694             self.sleep(transmit_time)
1695             self.test_session.send_packet(demand)
1696             try:
1697                 p = wait_for_bfd_packet(self, timeout=0)
1698                 self.logger.error(ppp("Received unexpected packet:", p))
1699                 count += 1
1700             except CaptureTimeoutError:
1701                 pass
1702         events = self.vapi.collect_events()
1703         for e in events:
1704             self.logger.error("Received unexpected event: %s", e)
1705         self.assert_equal(count, 0, "number of packets received")
1706         self.assert_equal(len(events), 0, "number of events received")
1707
1708     def test_intf_deleted(self):
1709         """interface with bfd session deleted"""
1710         intf = VppLoInterface(self)
1711         intf.config_ip4()
1712         intf.admin_up()
1713         sw_if_index = intf.sw_if_index
1714         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1715         vpp_session.add_vpp_config()
1716         vpp_session.admin_up()
1717         intf.remove_vpp_config()
1718         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1719         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1720         self.assertFalse(vpp_session.query_vpp_config())
1721
1722
1723 @tag_run_solo
1724 @tag_fixme_vpp_workers
1725 class BFD6TestCase(VppTestCase):
1726     """Bidirectional Forwarding Detection (BFD) (IPv6)"""
1727
1728     pg0 = None
1729     vpp_clock_offset = None
1730     vpp_session = None
1731     test_session = None
1732
1733     @classmethod
1734     def setUpClass(cls):
1735         super(BFD6TestCase, cls).setUpClass()
1736         cls.vapi.cli("set log class bfd level debug")
1737         try:
1738             cls.create_pg_interfaces([0])
1739             cls.pg0.config_ip6()
1740             cls.pg0.configure_ipv6_neighbors()
1741             cls.pg0.admin_up()
1742             cls.pg0.resolve_ndp()
1743             cls.create_loopback_interfaces(1)
1744             cls.loopback0 = cls.lo_interfaces[0]
1745             cls.loopback0.config_ip6()
1746             cls.loopback0.admin_up()
1747
1748         except Exception:
1749             super(BFD6TestCase, cls).tearDownClass()
1750             raise
1751
1752     @classmethod
1753     def tearDownClass(cls):
1754         super(BFD6TestCase, cls).tearDownClass()
1755
1756     def setUp(self):
1757         super(BFD6TestCase, self).setUp()
1758         self.factory = AuthKeyFactory()
1759         self.vapi.want_bfd_events()
1760         self.pg0.enable_capture()
1761         try:
1762             self.bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1763             self.bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1764             self.vpp_session = VppBFDUDPSession(
1765                 self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
1766             )
1767             self.vpp_session.add_vpp_config()
1768             self.vpp_session.admin_up()
1769             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1770             self.logger.debug(self.vapi.cli("show adj nbr"))
1771         except BaseException:
1772             self.vapi.want_bfd_events(enable_disable=0)
1773             raise
1774
1775     def tearDown(self):
1776         if not self.vpp_dead:
1777             self.vapi.want_bfd_events(enable_disable=0)
1778         self.vapi.collect_events()  # clear the event queue
1779         super(BFD6TestCase, self).tearDown()
1780
1781     def test_session_up(self):
1782         """bring BFD session up"""
1783         bfd_session_up(self)
1784         bfd_udp4_sessions = self.statistics["/bfd/udp4/sessions"]
1785         bfd_udp6_sessions = self.statistics["/bfd/udp6/sessions"]
1786         self.assert_equal(bfd_udp4_sessions, self.bfd_udp4_sessions)
1787         self.assert_equal(bfd_udp6_sessions - self.bfd_udp6_sessions, 1)
1788
1789     def test_session_up_by_ip(self):
1790         """bring BFD session up - first frame looked up by address pair"""
1791         self.logger.info("BFD: Sending Slow control frame")
1792         self.test_session.update(my_discriminator=randint(0, 40000000))
1793         self.test_session.send_packet()
1794         self.pg0.enable_capture()
1795         p = self.pg0.wait_for_packet(1)
1796         self.assert_equal(
1797             p[BFD].your_discriminator,
1798             self.test_session.my_discriminator,
1799             "BFD - your discriminator",
1800         )
1801         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1802         self.test_session.update(
1803             your_discriminator=p[BFD].my_discriminator, state=BFDState.up
1804         )
1805         self.logger.info("BFD: Waiting for event")
1806         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1807         verify_event(self, e, expected_state=BFDState.init)
1808         self.logger.info("BFD: Sending Up")
1809         self.test_session.send_packet()
1810         self.logger.info("BFD: Waiting for event")
1811         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1812         verify_event(self, e, expected_state=BFDState.up)
1813         self.logger.info("BFD: Session is Up")
1814         self.test_session.update(state=BFDState.up)
1815         self.test_session.send_packet()
1816         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1817
1818     def test_hold_up(self):
1819         """hold BFD session up"""
1820         bfd_session_up(self)
1821         for dummy in range(self.test_session.detect_mult * 2):
1822             wait_for_bfd_packet(self)
1823             self.test_session.send_packet()
1824         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
1825         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1826
1827     def test_echo_looped_back(self):
1828         """echo packets looped back"""
1829         bfd_session_up(self)
1830         stats_before = bfd_grab_stats_snapshot(self)
1831         self.pg0.enable_capture()
1832         echo_packet_count = 10
1833         # random source port low enough to increment a few times..
1834         udp_sport_tx = randint(1, 50000)
1835         udp_sport_rx = udp_sport_tx
1836         echo_packet = (
1837             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
1838             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6)
1839             / UDP(dport=BFD.udp_dport_echo)
1840             / Raw("this should be looped back")
1841         )
1842         for dummy in range(echo_packet_count):
1843             self.sleep(0.01, "delay between echo packets")
1844             echo_packet[UDP].sport = udp_sport_tx
1845             udp_sport_tx += 1
1846             self.logger.debug(ppp("Sending packet:", echo_packet))
1847             self.pg0.add_stream(echo_packet)
1848             self.pg_start()
1849         counter = 0
1850         bfd_control_packets_rx = 0
1851         while counter < echo_packet_count:
1852             p = self.pg0.wait_for_packet(1)
1853             self.logger.debug(ppp("Got packet:", p))
1854             ether = p[Ether]
1855             self.assert_equal(self.pg0.remote_mac, ether.dst, "Destination MAC")
1856             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1857             ip = p[IPv6]
1858             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1859             udp = p[UDP]
1860             if udp.dport == BFD.udp_dport:
1861                 bfd_control_packets_rx += 1
1862                 continue
1863             self.assert_equal(self.pg0.remote_ip6, ip.src, "Source IP")
1864             self.assert_equal(udp.dport, BFD.udp_dport_echo, "UDP destination port")
1865             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1866             udp_sport_rx += 1
1867             # need to compare the hex payload here, otherwise BFD_vpp_echo
1868             # gets in way
1869             self.assertEqual(
1870                 scapy.compat.raw(p[UDP].payload),
1871                 scapy.compat.raw(echo_packet[UDP].payload),
1872                 "Received packet is not the echo packet sent",
1873             )
1874             counter += 1
1875         self.assert_equal(
1876             udp_sport_tx,
1877             udp_sport_rx,
1878             "UDP source port (== ECHO packet identifier for test purposes)",
1879         )
1880         stats_after = bfd_grab_stats_snapshot(self)
1881         diff = bfd_stats_diff(stats_before, stats_after)
1882         self.assertEqual(0, diff.rx, "RX counter bumped but no BFD packets sent")
1883         self.assertEqual(bfd_control_packets_rx, diff.tx, "TX counter incorrect")
1884         self.assertEqual(
1885             0, diff.rx_echo, "RX echo counter bumped but no BFD session exists"
1886         )
1887         self.assertEqual(
1888             0, diff.tx_echo, "TX echo counter bumped but no BFD session exists"
1889         )
1890
1891     def test_echo(self):
1892         """echo function"""
1893         stats_before = bfd_grab_stats_snapshot(self)
1894         bfd_session_up(self)
1895         self.test_session.update(required_min_echo_rx=150000)
1896         self.test_session.send_packet()
1897         detection_time = (
1898             self.test_session.detect_mult
1899             * self.vpp_session.required_min_rx
1900             / USEC_IN_SEC
1901         )
1902         # echo shouldn't work without echo source set
1903         for dummy in range(10):
1904             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1905             self.sleep(sleep, "delay before sending bfd packet")
1906             self.test_session.send_packet()
1907         p = wait_for_bfd_packet(self, pcap_time_min=time.time() - self.vpp_clock_offset)
1908         self.assert_equal(
1909             p[BFD].required_min_rx_interval,
1910             self.vpp_session.required_min_rx,
1911             "BFD required min rx interval",
1912         )
1913         self.test_session.send_packet()
1914         self.vapi.bfd_udp_set_echo_source(sw_if_index=self.loopback0.sw_if_index)
1915         echo_seen = False
1916         # should be turned on - loopback echo packets
1917         for dummy in range(3):
1918             loop_until = time.time() + 0.75 * detection_time
1919             while time.time() < loop_until:
1920                 p = self.pg0.wait_for_packet(1)
1921                 self.logger.debug(ppp("Got packet:", p))
1922                 if p[UDP].dport == BFD.udp_dport_echo:
1923                     self.assert_equal(
1924                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP"
1925                     )
1926                     self.assertNotEqual(
1927                         p[IPv6].src,
1928                         self.loopback0.local_ip6,
1929                         "BFD ECHO src IP equal to loopback IP",
1930                     )
1931                     self.logger.debug(ppp("Looping back packet:", p))
1932                     self.assert_equal(
1933                         p[Ether].dst,
1934                         self.pg0.remote_mac,
1935                         "ECHO packet destination MAC address",
1936                     )
1937                     self.test_session.rx_packets_echo += 1
1938                     self.test_session.tx_packets_echo += 1
1939                     p[Ether].dst = self.pg0.local_mac
1940                     self.pg0.add_stream(p)
1941                     self.pg_start()
1942                     echo_seen = True
1943                 elif p.haslayer(BFD):
1944                     self.test_session.rx_packets += 1
1945                     if echo_seen:
1946                         self.assertGreaterEqual(
1947                             p[BFD].required_min_rx_interval, 1000000
1948                         )
1949                     if "P" in p.sprintf("%BFD.flags%"):
1950                         final = self.test_session.create_packet()
1951                         final[BFD].flags = "F"
1952                         self.test_session.send_packet(final)
1953                 else:
1954                     raise Exception(ppp("Received unknown packet:", p))
1955
1956                 self.assert_equal(
1957                     len(self.vapi.collect_events()), 0, "number of bfd events"
1958                 )
1959             self.test_session.send_packet()
1960         self.assertTrue(echo_seen, "No echo packets received")
1961
1962         stats_after = bfd_grab_stats_snapshot(self)
1963         diff = bfd_stats_diff(stats_before, stats_after)
1964         # our rx is vpp tx and vice versa, also tolerate one packet off
1965         self.assert_in_range(
1966             self.test_session.tx_packets, diff.rx - 1, diff.rx + 1, "RX counter"
1967         )
1968         self.assert_in_range(
1969             self.test_session.rx_packets, diff.tx - 1, diff.tx + 1, "TX counter"
1970         )
1971         self.assert_in_range(
1972             self.test_session.tx_packets_echo,
1973             diff.rx_echo - 1,
1974             diff.rx_echo + 1,
1975             "RX echo counter",
1976         )
1977         self.assert_in_range(
1978             self.test_session.rx_packets_echo,
1979             diff.tx_echo - 1,
1980             diff.tx_echo + 1,
1981             "TX echo counter",
1982         )
1983
1984     def test_intf_deleted(self):
1985         """interface with bfd session deleted"""
1986         intf = VppLoInterface(self)
1987         intf.config_ip6()
1988         intf.admin_up()
1989         sw_if_index = intf.sw_if_index
1990         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip6, af=AF_INET6)
1991         vpp_session.add_vpp_config()
1992         vpp_session.admin_up()
1993         intf.remove_vpp_config()
1994         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1995         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1996         self.assertFalse(vpp_session.query_vpp_config())
1997
1998
1999 @tag_run_solo
2000 class BFDFIBTestCase(VppTestCase):
2001     """BFD-FIB interactions (IPv6)"""
2002
2003     vpp_session = None
2004     test_session = None
2005
2006     @classmethod
2007     def setUpClass(cls):
2008         super(BFDFIBTestCase, cls).setUpClass()
2009
2010     @classmethod
2011     def tearDownClass(cls):
2012         super(BFDFIBTestCase, cls).tearDownClass()
2013
2014     def setUp(self):
2015         super(BFDFIBTestCase, self).setUp()
2016         self.create_pg_interfaces(range(1))
2017
2018         self.vapi.want_bfd_events()
2019         self.pg0.enable_capture()
2020
2021         for i in self.pg_interfaces:
2022             i.admin_up()
2023             i.config_ip6()
2024             i.configure_ipv6_neighbors()
2025
2026     def tearDown(self):
2027         if not self.vpp_dead:
2028             self.vapi.want_bfd_events(enable_disable=False)
2029
2030         super(BFDFIBTestCase, self).tearDown()
2031
2032     @staticmethod
2033     def pkt_is_not_data_traffic(p):
2034         """not data traffic implies BFD or the usual IPv6 ND/RA"""
2035         if p.haslayer(BFD) or is_ipv6_misc(p):
2036             return True
2037         return False
2038
2039     def test_session_with_fib(self):
2040         """BFD-FIB interactions"""
2041
2042         # packets to match against both of the routes
2043         p = [
2044             (
2045                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2046                 / IPv6(src="3001::1", dst="2001::1")
2047                 / UDP(sport=1234, dport=1234)
2048                 / Raw(b"\xa5" * 100)
2049             ),
2050             (
2051                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2052                 / IPv6(src="3001::1", dst="2002::1")
2053                 / UDP(sport=1234, dport=1234)
2054                 / Raw(b"\xa5" * 100)
2055             ),
2056         ]
2057
2058         # A recursive and a non-recursive route via a next-hop that
2059         # will have a BFD session
2060         ip_2001_s_64 = VppIpRoute(
2061             self,
2062             "2001::",
2063             64,
2064             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
2065         )
2066         ip_2002_s_64 = VppIpRoute(
2067             self, "2002::", 64, [VppRoutePath(self.pg0.remote_ip6, 0xFFFFFFFF)]
2068         )
2069         ip_2001_s_64.add_vpp_config()
2070         ip_2002_s_64.add_vpp_config()
2071
2072         # bring the session up now the routes are present
2073         self.vpp_session = VppBFDUDPSession(
2074             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6
2075         )
2076         self.vpp_session.add_vpp_config()
2077         self.vpp_session.admin_up()
2078         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
2079
2080         # session is up - traffic passes
2081         bfd_session_up(self)
2082
2083         self.pg0.add_stream(p)
2084         self.pg_start()
2085         for packet in p:
2086             captured = self.pg0.wait_for_packet(
2087                 1, filter_out_fn=self.pkt_is_not_data_traffic
2088             )
2089             self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2090
2091         # session is up - traffic is dropped
2092         bfd_session_down(self)
2093
2094         self.pg0.add_stream(p)
2095         self.pg_start()
2096         with self.assertRaises(CaptureTimeoutError):
2097             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
2098
2099         # session is up - traffic passes
2100         bfd_session_up(self)
2101
2102         self.pg0.add_stream(p)
2103         self.pg_start()
2104         for packet in p:
2105             captured = self.pg0.wait_for_packet(
2106                 1, filter_out_fn=self.pkt_is_not_data_traffic
2107             )
2108             self.assertEqual(captured[IPv6].dst, packet[IPv6].dst)
2109
2110
2111 @unittest.skipUnless(config.extended, "part of extended tests")
2112 class BFDTunTestCase(VppTestCase):
2113     """BFD over GRE tunnel"""
2114
2115     vpp_session = None
2116     test_session = None
2117
2118     @classmethod
2119     def setUpClass(cls):
2120         super(BFDTunTestCase, cls).setUpClass()
2121
2122     @classmethod
2123     def tearDownClass(cls):
2124         super(BFDTunTestCase, cls).tearDownClass()
2125
2126     def setUp(self):
2127         super(BFDTunTestCase, self).setUp()
2128         self.create_pg_interfaces(range(1))
2129
2130         self.vapi.want_bfd_events()
2131         self.pg0.enable_capture()
2132
2133         for i in self.pg_interfaces:
2134             i.admin_up()
2135             i.config_ip4()
2136             i.resolve_arp()
2137
2138     def tearDown(self):
2139         if not self.vpp_dead:
2140             self.vapi.want_bfd_events(enable_disable=0)
2141
2142         super(BFDTunTestCase, self).tearDown()
2143
2144     @staticmethod
2145     def pkt_is_not_data_traffic(p):
2146         """not data traffic implies BFD or the usual IPv6 ND/RA"""
2147         if p.haslayer(BFD) or is_ipv6_misc(p):
2148             return True
2149         return False
2150
2151     def test_bfd_o_gre(self):
2152         """BFD-o-GRE"""
2153
2154         # A GRE interface over which to run a BFD session
2155         gre_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
2156         gre_if.add_vpp_config()
2157         gre_if.admin_up()
2158         gre_if.config_ip4()
2159
2160         # bring the session up now the routes are present
2161         self.vpp_session = VppBFDUDPSession(
2162             self, gre_if, gre_if.remote_ip4, is_tunnel=True
2163         )
2164         self.vpp_session.add_vpp_config()
2165         self.vpp_session.admin_up()
2166
2167         self.test_session = BFDTestSession(
2168             self,
2169             gre_if,
2170             AF_INET,
2171             tunnel_header=(IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / GRE()),
2172             phy_interface=self.pg0,
2173         )
2174
2175         # packets to match against both of the routes
2176         p = [
2177             (
2178                 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2179                 / IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4)
2180                 / UDP(sport=1234, dport=1234)
2181                 / Raw(b"\xa5" * 100)
2182             )
2183         ]
2184
2185         # session is up - traffic passes
2186         bfd_session_up(self)
2187
2188         self.send_and_expect(self.pg0, p, self.pg0)
2189
2190         # bring session down
2191         bfd_session_down(self)
2192
2193
2194 @tag_run_solo
2195 class BFDSHA1TestCase(VppTestCase):
2196     """Bidirectional Forwarding Detection (BFD) (SHA1 auth)"""
2197
2198     pg0 = None
2199     vpp_clock_offset = None
2200     vpp_session = None
2201     test_session = None
2202
2203     @classmethod
2204     def setUpClass(cls):
2205         super(BFDSHA1TestCase, cls).setUpClass()
2206         cls.vapi.cli("set log class bfd level debug")
2207         try:
2208             cls.create_pg_interfaces([0])
2209             cls.pg0.config_ip4()
2210             cls.pg0.admin_up()
2211             cls.pg0.resolve_arp()
2212
2213         except Exception:
2214             super(BFDSHA1TestCase, cls).tearDownClass()
2215             raise
2216
2217     @classmethod
2218     def tearDownClass(cls):
2219         super(BFDSHA1TestCase, cls).tearDownClass()
2220
2221     def setUp(self):
2222         super(BFDSHA1TestCase, self).setUp()
2223         self.factory = AuthKeyFactory()
2224         self.vapi.want_bfd_events()
2225         self.pg0.enable_capture()
2226
2227     def tearDown(self):
2228         if not self.vpp_dead:
2229             self.vapi.want_bfd_events(enable_disable=False)
2230         self.vapi.collect_events()  # clear the event queue
2231         super(BFDSHA1TestCase, self).tearDown()
2232
2233     def test_session_up(self):
2234         """bring BFD session up"""
2235         key = self.factory.create_random_key(self)
2236         key.add_vpp_config()
2237         self.vpp_session = VppBFDUDPSession(
2238             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2239         )
2240         self.vpp_session.add_vpp_config()
2241         self.vpp_session.admin_up()
2242         self.test_session = BFDTestSession(
2243             self,
2244             self.pg0,
2245             AF_INET,
2246             sha1_key=key,
2247             bfd_key_id=self.vpp_session.bfd_key_id,
2248         )
2249         bfd_session_up(self)
2250
2251     def test_hold_up(self):
2252         """hold BFD session up"""
2253         key = self.factory.create_random_key(self)
2254         key.add_vpp_config()
2255         self.vpp_session = VppBFDUDPSession(
2256             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2257         )
2258         self.vpp_session.add_vpp_config()
2259         self.vpp_session.admin_up()
2260         self.test_session = BFDTestSession(
2261             self,
2262             self.pg0,
2263             AF_INET,
2264             sha1_key=key,
2265             bfd_key_id=self.vpp_session.bfd_key_id,
2266         )
2267         bfd_session_up(self)
2268         for dummy in range(self.test_session.detect_mult * 2):
2269             wait_for_bfd_packet(self)
2270             self.test_session.send_packet()
2271         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2272
2273     def test_hold_up_meticulous(self):
2274         """hold BFD session up - meticulous auth"""
2275         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2276         key.add_vpp_config()
2277         self.vpp_session = VppBFDUDPSession(
2278             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2279         )
2280         self.vpp_session.add_vpp_config()
2281         self.vpp_session.admin_up()
2282         # specify sequence number so that it wraps
2283         self.test_session = BFDTestSession(
2284             self,
2285             self.pg0,
2286             AF_INET,
2287             sha1_key=key,
2288             bfd_key_id=self.vpp_session.bfd_key_id,
2289             our_seq_number=0xFFFFFFFF - 4,
2290         )
2291         bfd_session_up(self)
2292         for dummy in range(30):
2293             wait_for_bfd_packet(self)
2294             self.test_session.inc_seq_num()
2295             self.test_session.send_packet()
2296         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2297
2298     def test_send_bad_seq_number(self):
2299         """session is not kept alive by msgs with bad sequence numbers"""
2300         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2301         key.add_vpp_config()
2302         self.vpp_session = VppBFDUDPSession(
2303             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2304         )
2305         self.vpp_session.add_vpp_config()
2306         self.test_session = BFDTestSession(
2307             self,
2308             self.pg0,
2309             AF_INET,
2310             sha1_key=key,
2311             bfd_key_id=self.vpp_session.bfd_key_id,
2312         )
2313         bfd_session_up(self)
2314         detection_time = (
2315             self.test_session.detect_mult
2316             * self.vpp_session.required_min_rx
2317             / USEC_IN_SEC
2318         )
2319         send_until = time.time() + 2 * detection_time
2320         while time.time() < send_until:
2321             self.test_session.send_packet()
2322             self.sleep(
2323                 0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2324                 "time between bfd packets",
2325             )
2326         e = self.vapi.collect_events()
2327         # session should be down now, because the sequence numbers weren't
2328         # updated
2329         self.assert_equal(len(e), 1, "number of bfd events")
2330         verify_event(self, e[0], expected_state=BFDState.down)
2331
2332     def execute_rogue_session_scenario(
2333         self,
2334         vpp_bfd_udp_session,
2335         legitimate_test_session,
2336         rogue_test_session,
2337         rogue_bfd_values=None,
2338     ):
2339         """execute a rogue session interaction scenario
2340
2341         1. create vpp session, add config
2342         2. bring the legitimate session up
2343         3. copy the bfd values from legitimate session to rogue session
2344         4. apply rogue_bfd_values to rogue session
2345         5. set rogue session state to down
2346         6. send message to take the session down from the rogue session
2347         7. assert that the legitimate session is unaffected
2348         """
2349
2350         self.vpp_session = vpp_bfd_udp_session
2351         self.vpp_session.add_vpp_config()
2352         self.test_session = legitimate_test_session
2353         # bring vpp session up
2354         bfd_session_up(self)
2355         # send packet from rogue session
2356         rogue_test_session.update(
2357             my_discriminator=self.test_session.my_discriminator,
2358             your_discriminator=self.test_session.your_discriminator,
2359             desired_min_tx=self.test_session.desired_min_tx,
2360             required_min_rx=self.test_session.required_min_rx,
2361             detect_mult=self.test_session.detect_mult,
2362             diag=self.test_session.diag,
2363             state=self.test_session.state,
2364             auth_type=self.test_session.auth_type,
2365         )
2366         if rogue_bfd_values:
2367             rogue_test_session.update(**rogue_bfd_values)
2368         rogue_test_session.update(state=BFDState.down)
2369         rogue_test_session.send_packet()
2370         wait_for_bfd_packet(self)
2371         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2372
2373     def test_mismatch_auth(self):
2374         """session is not brought down by unauthenticated msg"""
2375         key = self.factory.create_random_key(self)
2376         key.add_vpp_config()
2377         vpp_session = VppBFDUDPSession(
2378             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2379         )
2380         legitimate_test_session = BFDTestSession(
2381             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2382         )
2383         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2384         self.execute_rogue_session_scenario(
2385             vpp_session, legitimate_test_session, rogue_test_session
2386         )
2387
2388     def test_mismatch_bfd_key_id(self):
2389         """session is not brought down by msg with non-existent key-id"""
2390         key = self.factory.create_random_key(self)
2391         key.add_vpp_config()
2392         vpp_session = VppBFDUDPSession(
2393             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2394         )
2395         # pick a different random bfd key id
2396         x = randint(0, 255)
2397         while x == vpp_session.bfd_key_id:
2398             x = randint(0, 255)
2399         legitimate_test_session = BFDTestSession(
2400             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2401         )
2402         rogue_test_session = BFDTestSession(
2403             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x
2404         )
2405         self.execute_rogue_session_scenario(
2406             vpp_session, legitimate_test_session, rogue_test_session
2407         )
2408
2409     def test_mismatched_auth_type(self):
2410         """session is not brought down by msg with wrong auth type"""
2411         key = self.factory.create_random_key(self)
2412         key.add_vpp_config()
2413         vpp_session = VppBFDUDPSession(
2414             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2415         )
2416         legitimate_test_session = BFDTestSession(
2417             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2418         )
2419         rogue_test_session = BFDTestSession(
2420             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=vpp_session.bfd_key_id
2421         )
2422         self.execute_rogue_session_scenario(
2423             vpp_session,
2424             legitimate_test_session,
2425             rogue_test_session,
2426             {"auth_type": BFDAuthType.keyed_md5},
2427         )
2428
2429     def test_restart(self):
2430         """simulate remote peer restart and resynchronization"""
2431         key = self.factory.create_random_key(self, BFDAuthType.meticulous_keyed_sha1)
2432         key.add_vpp_config()
2433         self.vpp_session = VppBFDUDPSession(
2434             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2435         )
2436         self.vpp_session.add_vpp_config()
2437         self.test_session = BFDTestSession(
2438             self,
2439             self.pg0,
2440             AF_INET,
2441             sha1_key=key,
2442             bfd_key_id=self.vpp_session.bfd_key_id,
2443             our_seq_number=0,
2444         )
2445         bfd_session_up(self)
2446         # don't send any packets for 2*detection_time
2447         detection_time = (
2448             self.test_session.detect_mult
2449             * self.vpp_session.required_min_rx
2450             / USEC_IN_SEC
2451         )
2452         self.sleep(2 * detection_time, "simulating peer restart")
2453         events = self.vapi.collect_events()
2454         self.assert_equal(len(events), 1, "number of bfd events")
2455         verify_event(self, events[0], expected_state=BFDState.down)
2456         self.test_session.update(state=BFDState.down)
2457         # reset sequence number
2458         self.test_session.our_seq_number = 0
2459         self.test_session.vpp_seq_number = None
2460         # now throw away any pending packets
2461         self.pg0.enable_capture()
2462         self.test_session.my_discriminator = 0
2463         bfd_session_up(self)
2464
2465
2466 @tag_run_solo
2467 class BFDAuthOnOffTestCase(VppTestCase):
2468     """Bidirectional Forwarding Detection (BFD) (changing auth)"""
2469
2470     pg0 = None
2471     vpp_session = None
2472     test_session = None
2473
2474     @classmethod
2475     def setUpClass(cls):
2476         super(BFDAuthOnOffTestCase, cls).setUpClass()
2477         cls.vapi.cli("set log class bfd level debug")
2478         try:
2479             cls.create_pg_interfaces([0])
2480             cls.pg0.config_ip4()
2481             cls.pg0.admin_up()
2482             cls.pg0.resolve_arp()
2483
2484         except Exception:
2485             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2486             raise
2487
2488     @classmethod
2489     def tearDownClass(cls):
2490         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2491
2492     def setUp(self):
2493         super(BFDAuthOnOffTestCase, self).setUp()
2494         self.factory = AuthKeyFactory()
2495         self.vapi.want_bfd_events()
2496         self.pg0.enable_capture()
2497
2498     def tearDown(self):
2499         if not self.vpp_dead:
2500             self.vapi.want_bfd_events(enable_disable=False)
2501         self.vapi.collect_events()  # clear the event queue
2502         super(BFDAuthOnOffTestCase, self).tearDown()
2503
2504     def test_auth_on_immediate(self):
2505         """turn auth on without disturbing session state (immediate)"""
2506         key = self.factory.create_random_key(self)
2507         key.add_vpp_config()
2508         self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2509         self.vpp_session.add_vpp_config()
2510         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2511         bfd_session_up(self)
2512         for dummy in range(self.test_session.detect_mult * 2):
2513             p = wait_for_bfd_packet(self)
2514             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2515             self.test_session.send_packet()
2516         self.vpp_session.activate_auth(key)
2517         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2518         self.test_session.sha1_key = key
2519         for dummy in range(self.test_session.detect_mult * 2):
2520             p = wait_for_bfd_packet(self)
2521             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2522             self.test_session.send_packet()
2523         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2524         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2525
2526     def test_auth_off_immediate(self):
2527         """turn auth off without disturbing session state (immediate)"""
2528         key = self.factory.create_random_key(self)
2529         key.add_vpp_config()
2530         self.vpp_session = VppBFDUDPSession(
2531             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2532         )
2533         self.vpp_session.add_vpp_config()
2534         self.test_session = BFDTestSession(
2535             self,
2536             self.pg0,
2537             AF_INET,
2538             sha1_key=key,
2539             bfd_key_id=self.vpp_session.bfd_key_id,
2540         )
2541         bfd_session_up(self)
2542         # self.vapi.want_bfd_events(enable_disable=0)
2543         for dummy in range(self.test_session.detect_mult * 2):
2544             p = wait_for_bfd_packet(self)
2545             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2546             self.test_session.inc_seq_num()
2547             self.test_session.send_packet()
2548         self.vpp_session.deactivate_auth()
2549         self.test_session.bfd_key_id = None
2550         self.test_session.sha1_key = None
2551         for dummy in range(self.test_session.detect_mult * 2):
2552             p = wait_for_bfd_packet(self)
2553             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2554             self.test_session.inc_seq_num()
2555             self.test_session.send_packet()
2556         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2557         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2558
2559     def test_auth_change_key_immediate(self):
2560         """change auth key without disturbing session state (immediate)"""
2561         key1 = self.factory.create_random_key(self)
2562         key1.add_vpp_config()
2563         key2 = self.factory.create_random_key(self)
2564         key2.add_vpp_config()
2565         self.vpp_session = VppBFDUDPSession(
2566             self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2567         )
2568         self.vpp_session.add_vpp_config()
2569         self.test_session = BFDTestSession(
2570             self,
2571             self.pg0,
2572             AF_INET,
2573             sha1_key=key1,
2574             bfd_key_id=self.vpp_session.bfd_key_id,
2575         )
2576         bfd_session_up(self)
2577         for dummy in range(self.test_session.detect_mult * 2):
2578             p = wait_for_bfd_packet(self)
2579             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2580             self.test_session.send_packet()
2581         self.vpp_session.activate_auth(key2)
2582         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2583         self.test_session.sha1_key = key2
2584         for dummy in range(self.test_session.detect_mult * 2):
2585             p = wait_for_bfd_packet(self)
2586             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2587             self.test_session.send_packet()
2588         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2589         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2590
2591     def test_auth_on_delayed(self):
2592         """turn auth on without disturbing session state (delayed)"""
2593         key = self.factory.create_random_key(self)
2594         key.add_vpp_config()
2595         self.vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2596         self.vpp_session.add_vpp_config()
2597         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2598         bfd_session_up(self)
2599         for dummy in range(self.test_session.detect_mult * 2):
2600             wait_for_bfd_packet(self)
2601             self.test_session.send_packet()
2602         self.vpp_session.activate_auth(key, delayed=True)
2603         for dummy in range(self.test_session.detect_mult * 2):
2604             p = wait_for_bfd_packet(self)
2605             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2606             self.test_session.send_packet()
2607         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2608         self.test_session.sha1_key = key
2609         self.test_session.send_packet()
2610         for dummy in range(self.test_session.detect_mult * 2):
2611             p = wait_for_bfd_packet(self)
2612             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2613             self.test_session.send_packet()
2614         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2615         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2616
2617     def test_auth_off_delayed(self):
2618         """turn auth off without disturbing session state (delayed)"""
2619         key = self.factory.create_random_key(self)
2620         key.add_vpp_config()
2621         self.vpp_session = VppBFDUDPSession(
2622             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2623         )
2624         self.vpp_session.add_vpp_config()
2625         self.test_session = BFDTestSession(
2626             self,
2627             self.pg0,
2628             AF_INET,
2629             sha1_key=key,
2630             bfd_key_id=self.vpp_session.bfd_key_id,
2631         )
2632         bfd_session_up(self)
2633         for dummy in range(self.test_session.detect_mult * 2):
2634             p = wait_for_bfd_packet(self)
2635             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2636             self.test_session.send_packet()
2637         self.vpp_session.deactivate_auth(delayed=True)
2638         for dummy in range(self.test_session.detect_mult * 2):
2639             p = wait_for_bfd_packet(self)
2640             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2641             self.test_session.send_packet()
2642         self.test_session.bfd_key_id = None
2643         self.test_session.sha1_key = None
2644         self.test_session.send_packet()
2645         for dummy in range(self.test_session.detect_mult * 2):
2646             p = wait_for_bfd_packet(self)
2647             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2648             self.test_session.send_packet()
2649         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2650         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2651
2652     def test_auth_change_key_delayed(self):
2653         """change auth key without disturbing session state (delayed)"""
2654         key1 = self.factory.create_random_key(self)
2655         key1.add_vpp_config()
2656         key2 = self.factory.create_random_key(self)
2657         key2.add_vpp_config()
2658         self.vpp_session = VppBFDUDPSession(
2659             self, self.pg0, self.pg0.remote_ip4, sha1_key=key1
2660         )
2661         self.vpp_session.add_vpp_config()
2662         self.vpp_session.admin_up()
2663         self.test_session = BFDTestSession(
2664             self,
2665             self.pg0,
2666             AF_INET,
2667             sha1_key=key1,
2668             bfd_key_id=self.vpp_session.bfd_key_id,
2669         )
2670         bfd_session_up(self)
2671         for dummy in range(self.test_session.detect_mult * 2):
2672             p = wait_for_bfd_packet(self)
2673             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2674             self.test_session.send_packet()
2675         self.vpp_session.activate_auth(key2, delayed=True)
2676         for dummy in range(self.test_session.detect_mult * 2):
2677             p = wait_for_bfd_packet(self)
2678             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2679             self.test_session.send_packet()
2680         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2681         self.test_session.sha1_key = key2
2682         self.test_session.send_packet()
2683         for dummy in range(self.test_session.detect_mult * 2):
2684             p = wait_for_bfd_packet(self)
2685             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2686             self.test_session.send_packet()
2687         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2688         self.assert_equal(len(self.vapi.collect_events()), 0, "number of bfd events")
2689
2690
2691 @tag_run_solo
2692 class BFDCLITestCase(VppTestCase):
2693     """Bidirectional Forwarding Detection (BFD) (CLI)"""
2694
2695     pg0 = None
2696
2697     @classmethod
2698     def setUpClass(cls):
2699         super(BFDCLITestCase, cls).setUpClass()
2700         cls.vapi.cli("set log class bfd level debug")
2701         try:
2702             cls.create_pg_interfaces((0,))
2703             cls.pg0.config_ip4()
2704             cls.pg0.config_ip6()
2705             cls.pg0.resolve_arp()
2706             cls.pg0.resolve_ndp()
2707
2708         except Exception:
2709             super(BFDCLITestCase, cls).tearDownClass()
2710             raise
2711
2712     @classmethod
2713     def tearDownClass(cls):
2714         super(BFDCLITestCase, cls).tearDownClass()
2715
2716     def setUp(self):
2717         super(BFDCLITestCase, self).setUp()
2718         self.factory = AuthKeyFactory()
2719         self.pg0.enable_capture()
2720
2721     def tearDown(self):
2722         try:
2723             self.vapi.want_bfd_events(enable_disable=False)
2724         except UnexpectedApiReturnValueError:
2725             # some tests aren't subscribed, so this is not an issue
2726             pass
2727         self.vapi.collect_events()  # clear the event queue
2728         super(BFDCLITestCase, self).tearDown()
2729
2730     def cli_verify_no_response(self, cli):
2731         """execute a CLI, asserting that the response is empty"""
2732         self.assert_equal(self.vapi.cli(cli), "", "CLI command response")
2733
2734     def cli_verify_response(self, cli, expected):
2735         """execute a CLI, asserting that the response matches expectation"""
2736         try:
2737             reply = self.vapi.cli(cli)
2738         except CliFailedCommandError as cli_error:
2739             reply = str(cli_error)
2740         self.assert_equal(reply.strip(), expected, "CLI command response")
2741
2742     def test_show(self):
2743         """show commands"""
2744         k1 = self.factory.create_random_key(self)
2745         k1.add_vpp_config()
2746         k2 = self.factory.create_random_key(
2747             self, auth_type=BFDAuthType.meticulous_keyed_sha1
2748         )
2749         k2.add_vpp_config()
2750         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2751         s1.add_vpp_config()
2752         s2 = VppBFDUDPSession(
2753             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k2
2754         )
2755         s2.add_vpp_config()
2756         self.logger.info(self.vapi.ppcli("show bfd keys"))
2757         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2758         self.logger.info(self.vapi.ppcli("show bfd"))
2759
2760     def test_set_del_sha1_key(self):
2761         """set/delete SHA1 auth key"""
2762         k = self.factory.create_random_key(self)
2763         self.registry.register(k, self.logger)
2764         self.cli_verify_no_response(
2765             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2766             % (
2767                 k.conf_key_id,
2768                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2769             )
2770         )
2771         self.assertTrue(k.query_vpp_config())
2772         self.vpp_session = VppBFDUDPSession(
2773             self, self.pg0, self.pg0.remote_ip4, sha1_key=k
2774         )
2775         self.vpp_session.add_vpp_config()
2776         self.test_session = BFDTestSession(
2777             self, self.pg0, AF_INET, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2778         )
2779         self.vapi.want_bfd_events()
2780         bfd_session_up(self)
2781         bfd_session_down(self)
2782         # try to replace the secret for the key - should fail because the key
2783         # is in-use
2784         k2 = self.factory.create_random_key(self)
2785         self.cli_verify_response(
2786             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2787             % (
2788                 k.conf_key_id,
2789                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2790             ),
2791             "bfd key set: `bfd_auth_set_key' API call failed, "
2792             "rv=-103:BFD object in use",
2793         )
2794         # manipulating the session using old secret should still work
2795         bfd_session_up(self)
2796         bfd_session_down(self)
2797         self.vpp_session.remove_vpp_config()
2798         self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2799         self.assertFalse(k.query_vpp_config())
2800
2801     def test_set_del_meticulous_sha1_key(self):
2802         """set/delete meticulous SHA1 auth key"""
2803         k = self.factory.create_random_key(
2804             self, auth_type=BFDAuthType.meticulous_keyed_sha1
2805         )
2806         self.registry.register(k, self.logger)
2807         self.cli_verify_no_response(
2808             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s"
2809             % (
2810                 k.conf_key_id,
2811                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key),
2812             )
2813         )
2814         self.assertTrue(k.query_vpp_config())
2815         self.vpp_session = VppBFDUDPSession(
2816             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=k
2817         )
2818         self.vpp_session.add_vpp_config()
2819         self.vpp_session.admin_up()
2820         self.test_session = BFDTestSession(
2821             self, self.pg0, AF_INET6, sha1_key=k, bfd_key_id=self.vpp_session.bfd_key_id
2822         )
2823         self.vapi.want_bfd_events()
2824         bfd_session_up(self)
2825         bfd_session_down(self)
2826         # try to replace the secret for the key - should fail because the key
2827         # is in-use
2828         k2 = self.factory.create_random_key(self)
2829         self.cli_verify_response(
2830             "bfd key set conf-key-id %s type keyed-sha1 secret %s"
2831             % (
2832                 k.conf_key_id,
2833                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key),
2834             ),
2835             "bfd key set: `bfd_auth_set_key' API call failed, "
2836             "rv=-103:BFD object in use",
2837         )
2838         # manipulating the session using old secret should still work
2839         bfd_session_up(self)
2840         bfd_session_down(self)
2841         self.vpp_session.remove_vpp_config()
2842         self.cli_verify_no_response("bfd key del conf-key-id %s" % k.conf_key_id)
2843         self.assertFalse(k.query_vpp_config())
2844
2845     def test_add_mod_del_bfd_udp(self):
2846         """create/modify/delete IPv4 BFD UDP session"""
2847         vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2848         self.registry.register(vpp_session, self.logger)
2849         cli_add_cmd = (
2850             "bfd udp session add interface %s local-addr %s "
2851             "peer-addr %s desired-min-tx %s required-min-rx %s "
2852             "detect-mult %s"
2853             % (
2854                 self.pg0.name,
2855                 self.pg0.local_ip4,
2856                 self.pg0.remote_ip4,
2857                 vpp_session.desired_min_tx,
2858                 vpp_session.required_min_rx,
2859                 vpp_session.detect_mult,
2860             )
2861         )
2862         self.cli_verify_no_response(cli_add_cmd)
2863         # 2nd add should fail
2864         self.cli_verify_response(
2865             cli_add_cmd,
2866             "bfd udp session add: `bfd_add_add_session' API call"
2867             " failed, rv=-101:Duplicate BFD object",
2868         )
2869         verify_bfd_session_config(self, vpp_session)
2870         mod_session = VppBFDUDPSession(
2871             self,
2872             self.pg0,
2873             self.pg0.remote_ip4,
2874             required_min_rx=2 * vpp_session.required_min_rx,
2875             desired_min_tx=3 * vpp_session.desired_min_tx,
2876             detect_mult=4 * vpp_session.detect_mult,
2877         )
2878         self.cli_verify_no_response(
2879             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2880             "desired-min-tx %s required-min-rx %s detect-mult %s"
2881             % (
2882                 self.pg0.name,
2883                 self.pg0.local_ip4,
2884                 self.pg0.remote_ip4,
2885                 mod_session.desired_min_tx,
2886                 mod_session.required_min_rx,
2887                 mod_session.detect_mult,
2888             )
2889         )
2890         verify_bfd_session_config(self, mod_session)
2891         cli_del_cmd = (
2892             "bfd udp session del interface %s local-addr %s "
2893             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2894         )
2895         self.cli_verify_no_response(cli_del_cmd)
2896         # 2nd del is expected to fail
2897         self.cli_verify_response(
2898             cli_del_cmd,
2899             "bfd udp session del: `bfd_udp_del_session' API call"
2900             " failed, rv=-102:No such BFD object",
2901         )
2902         self.assertFalse(vpp_session.query_vpp_config())
2903
2904     def test_add_mod_del_bfd_udp6(self):
2905         """create/modify/delete IPv6 BFD UDP session"""
2906         vpp_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2907         self.registry.register(vpp_session, self.logger)
2908         cli_add_cmd = (
2909             "bfd udp session add interface %s local-addr %s "
2910             "peer-addr %s desired-min-tx %s required-min-rx %s "
2911             "detect-mult %s"
2912             % (
2913                 self.pg0.name,
2914                 self.pg0.local_ip6,
2915                 self.pg0.remote_ip6,
2916                 vpp_session.desired_min_tx,
2917                 vpp_session.required_min_rx,
2918                 vpp_session.detect_mult,
2919             )
2920         )
2921         self.cli_verify_no_response(cli_add_cmd)
2922         # 2nd add should fail
2923         self.cli_verify_response(
2924             cli_add_cmd,
2925             "bfd udp session add: `bfd_add_add_session' API call"
2926             " failed, rv=-101:Duplicate BFD object",
2927         )
2928         verify_bfd_session_config(self, vpp_session)
2929         mod_session = VppBFDUDPSession(
2930             self,
2931             self.pg0,
2932             self.pg0.remote_ip6,
2933             af=AF_INET6,
2934             required_min_rx=2 * vpp_session.required_min_rx,
2935             desired_min_tx=3 * vpp_session.desired_min_tx,
2936             detect_mult=4 * vpp_session.detect_mult,
2937         )
2938         self.cli_verify_no_response(
2939             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2940             "desired-min-tx %s required-min-rx %s detect-mult %s"
2941             % (
2942                 self.pg0.name,
2943                 self.pg0.local_ip6,
2944                 self.pg0.remote_ip6,
2945                 mod_session.desired_min_tx,
2946                 mod_session.required_min_rx,
2947                 mod_session.detect_mult,
2948             )
2949         )
2950         verify_bfd_session_config(self, mod_session)
2951         cli_del_cmd = (
2952             "bfd udp session del interface %s local-addr %s "
2953             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
2954         )
2955         self.cli_verify_no_response(cli_del_cmd)
2956         # 2nd del is expected to fail
2957         self.cli_verify_response(
2958             cli_del_cmd,
2959             "bfd udp session del: `bfd_udp_del_session' API call"
2960             " failed, rv=-102:No such BFD object",
2961         )
2962         self.assertFalse(vpp_session.query_vpp_config())
2963
2964     def test_add_mod_del_bfd_udp_auth(self):
2965         """create/modify/delete IPv4 BFD UDP session (authenticated)"""
2966         key = self.factory.create_random_key(self)
2967         key.add_vpp_config()
2968         vpp_session = VppBFDUDPSession(
2969             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
2970         )
2971         self.registry.register(vpp_session, self.logger)
2972         cli_add_cmd = (
2973             "bfd udp session add interface %s local-addr %s "
2974             "peer-addr %s desired-min-tx %s required-min-rx %s "
2975             "detect-mult %s conf-key-id %s bfd-key-id %s"
2976             % (
2977                 self.pg0.name,
2978                 self.pg0.local_ip4,
2979                 self.pg0.remote_ip4,
2980                 vpp_session.desired_min_tx,
2981                 vpp_session.required_min_rx,
2982                 vpp_session.detect_mult,
2983                 key.conf_key_id,
2984                 vpp_session.bfd_key_id,
2985             )
2986         )
2987         self.cli_verify_no_response(cli_add_cmd)
2988         # 2nd add should fail
2989         self.cli_verify_response(
2990             cli_add_cmd,
2991             "bfd udp session add: `bfd_add_add_session' API call"
2992             " failed, rv=-101:Duplicate BFD object",
2993         )
2994         verify_bfd_session_config(self, vpp_session)
2995         mod_session = VppBFDUDPSession(
2996             self,
2997             self.pg0,
2998             self.pg0.remote_ip4,
2999             sha1_key=key,
3000             bfd_key_id=vpp_session.bfd_key_id,
3001             required_min_rx=2 * vpp_session.required_min_rx,
3002             desired_min_tx=3 * vpp_session.desired_min_tx,
3003             detect_mult=4 * vpp_session.detect_mult,
3004         )
3005         self.cli_verify_no_response(
3006             "bfd udp session mod interface %s local-addr %s peer-addr %s "
3007             "desired-min-tx %s required-min-rx %s detect-mult %s"
3008             % (
3009                 self.pg0.name,
3010                 self.pg0.local_ip4,
3011                 self.pg0.remote_ip4,
3012                 mod_session.desired_min_tx,
3013                 mod_session.required_min_rx,
3014                 mod_session.detect_mult,
3015             )
3016         )
3017         verify_bfd_session_config(self, mod_session)
3018         cli_del_cmd = (
3019             "bfd udp session del interface %s local-addr %s "
3020             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3021         )
3022         self.cli_verify_no_response(cli_del_cmd)
3023         # 2nd del is expected to fail
3024         self.cli_verify_response(
3025             cli_del_cmd,
3026             "bfd udp session del: `bfd_udp_del_session' API call"
3027             " failed, rv=-102:No such BFD object",
3028         )
3029         self.assertFalse(vpp_session.query_vpp_config())
3030
3031     def test_add_mod_del_bfd_udp6_auth(self):
3032         """create/modify/delete IPv6 BFD UDP session (authenticated)"""
3033         key = self.factory.create_random_key(
3034             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3035         )
3036         key.add_vpp_config()
3037         vpp_session = VppBFDUDPSession(
3038             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key
3039         )
3040         self.registry.register(vpp_session, self.logger)
3041         cli_add_cmd = (
3042             "bfd udp session add interface %s local-addr %s "
3043             "peer-addr %s desired-min-tx %s required-min-rx %s "
3044             "detect-mult %s conf-key-id %s bfd-key-id %s"
3045             % (
3046                 self.pg0.name,
3047                 self.pg0.local_ip6,
3048                 self.pg0.remote_ip6,
3049                 vpp_session.desired_min_tx,
3050                 vpp_session.required_min_rx,
3051                 vpp_session.detect_mult,
3052                 key.conf_key_id,
3053                 vpp_session.bfd_key_id,
3054             )
3055         )
3056         self.cli_verify_no_response(cli_add_cmd)
3057         # 2nd add should fail
3058         self.cli_verify_response(
3059             cli_add_cmd,
3060             "bfd udp session add: `bfd_add_add_session' API call"
3061             " failed, rv=-101:Duplicate BFD object",
3062         )
3063         verify_bfd_session_config(self, vpp_session)
3064         mod_session = VppBFDUDPSession(
3065             self,
3066             self.pg0,
3067             self.pg0.remote_ip6,
3068             af=AF_INET6,
3069             sha1_key=key,
3070             bfd_key_id=vpp_session.bfd_key_id,
3071             required_min_rx=2 * vpp_session.required_min_rx,
3072             desired_min_tx=3 * vpp_session.desired_min_tx,
3073             detect_mult=4 * vpp_session.detect_mult,
3074         )
3075         self.cli_verify_no_response(
3076             "bfd udp session mod interface %s local-addr %s peer-addr %s "
3077             "desired-min-tx %s required-min-rx %s detect-mult %s"
3078             % (
3079                 self.pg0.name,
3080                 self.pg0.local_ip6,
3081                 self.pg0.remote_ip6,
3082                 mod_session.desired_min_tx,
3083                 mod_session.required_min_rx,
3084                 mod_session.detect_mult,
3085             )
3086         )
3087         verify_bfd_session_config(self, mod_session)
3088         cli_del_cmd = (
3089             "bfd udp session del interface %s local-addr %s "
3090             "peer-addr %s" % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6)
3091         )
3092         self.cli_verify_no_response(cli_del_cmd)
3093         # 2nd del is expected to fail
3094         self.cli_verify_response(
3095             cli_del_cmd,
3096             "bfd udp session del: `bfd_udp_del_session' API call"
3097             " failed, rv=-102:No such BFD object",
3098         )
3099         self.assertFalse(vpp_session.query_vpp_config())
3100
3101     def test_auth_on_off(self):
3102         """turn authentication on and off"""
3103         key = self.factory.create_random_key(
3104             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3105         )
3106         key.add_vpp_config()
3107         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3108         auth_session = VppBFDUDPSession(
3109             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3110         )
3111         session.add_vpp_config()
3112         cli_activate = (
3113             "bfd udp session auth activate interface %s local-addr %s "
3114             "peer-addr %s conf-key-id %s bfd-key-id %s"
3115             % (
3116                 self.pg0.name,
3117                 self.pg0.local_ip4,
3118                 self.pg0.remote_ip4,
3119                 key.conf_key_id,
3120                 auth_session.bfd_key_id,
3121             )
3122         )
3123         self.cli_verify_no_response(cli_activate)
3124         verify_bfd_session_config(self, auth_session)
3125         self.cli_verify_no_response(cli_activate)
3126         verify_bfd_session_config(self, auth_session)
3127         cli_deactivate = (
3128             "bfd udp session auth deactivate interface %s local-addr %s "
3129             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3130         )
3131         self.cli_verify_no_response(cli_deactivate)
3132         verify_bfd_session_config(self, session)
3133         self.cli_verify_no_response(cli_deactivate)
3134         verify_bfd_session_config(self, session)
3135
3136     def test_auth_on_off_delayed(self):
3137         """turn authentication on and off (delayed)"""
3138         key = self.factory.create_random_key(
3139             self, auth_type=BFDAuthType.meticulous_keyed_sha1
3140         )
3141         key.add_vpp_config()
3142         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3143         auth_session = VppBFDUDPSession(
3144             self, self.pg0, self.pg0.remote_ip4, sha1_key=key
3145         )
3146         session.add_vpp_config()
3147         cli_activate = (
3148             "bfd udp session auth activate interface %s local-addr %s "
3149             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"
3150             % (
3151                 self.pg0.name,
3152                 self.pg0.local_ip4,
3153                 self.pg0.remote_ip4,
3154                 key.conf_key_id,
3155                 auth_session.bfd_key_id,
3156             )
3157         )
3158         self.cli_verify_no_response(cli_activate)
3159         verify_bfd_session_config(self, auth_session)
3160         self.cli_verify_no_response(cli_activate)
3161         verify_bfd_session_config(self, auth_session)
3162         cli_deactivate = (
3163             "bfd udp session auth deactivate interface %s local-addr %s "
3164             "peer-addr %s delayed yes"
3165             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3166         )
3167         self.cli_verify_no_response(cli_deactivate)
3168         verify_bfd_session_config(self, session)
3169         self.cli_verify_no_response(cli_deactivate)
3170         verify_bfd_session_config(self, session)
3171
3172     def test_admin_up_down(self):
3173         """put session admin-up and admin-down"""
3174         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
3175         session.add_vpp_config()
3176         cli_down = (
3177             "bfd udp session set-flags admin down interface %s local-addr %s "
3178             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3179         )
3180         cli_up = (
3181             "bfd udp session set-flags admin up interface %s local-addr %s "
3182             "peer-addr %s " % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
3183         )
3184         self.cli_verify_no_response(cli_down)
3185         verify_bfd_session_config(self, session, state=BFDState.admin_down)
3186         self.cli_verify_no_response(cli_up)
3187         verify_bfd_session_config(self, session, state=BFDState.down)
3188
3189     def test_set_del_udp_echo_source(self):
3190         """set/del udp echo source"""
3191         self.create_loopback_interfaces(1)
3192         self.loopback0 = self.lo_interfaces[0]
3193         self.loopback0.admin_up()
3194         self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3195         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
3196         self.cli_verify_no_response(cli_set)
3197         self.cli_verify_response(
3198             "show bfd echo-source",
3199             "UDP echo source is: %s\n"
3200             "IPv4 address usable as echo source: none\n"
3201             "IPv6 address usable as echo source: none" % self.loopback0.name,
3202         )
3203         self.loopback0.config_ip4()
3204         echo_ip4 = str(
3205             ipaddress.IPv4Address(
3206                 int(ipaddress.IPv4Address(self.loopback0.local_ip4)) ^ 1
3207             )
3208         )
3209         self.cli_verify_response(
3210             "show bfd echo-source",
3211             "UDP echo source is: %s\n"
3212             "IPv4 address usable as echo source: %s\n"
3213             "IPv6 address usable as echo source: none"
3214             % (self.loopback0.name, echo_ip4),
3215         )
3216         echo_ip6 = str(
3217             ipaddress.IPv6Address(
3218                 int(ipaddress.IPv6Address(self.loopback0.local_ip6)) ^ 1
3219             )
3220         )
3221         self.loopback0.config_ip6()
3222         self.cli_verify_response(
3223             "show bfd echo-source",
3224             "UDP echo source is: %s\n"
3225             "IPv4 address usable as echo source: %s\n"
3226             "IPv6 address usable as echo source: %s"
3227             % (self.loopback0.name, echo_ip4, echo_ip6),
3228         )
3229         cli_del = "bfd udp echo-source del"
3230         self.cli_verify_no_response(cli_del)
3231         self.cli_verify_response("show bfd echo-source", "UDP echo source is not set.")
3232
3233
3234 if __name__ == "__main__":
3235     unittest.main(testRunner=VppTestRunner)