7a444cb3caff013311e5eda67ffeac9adae4323b
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python3
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import ipaddress
9 import reprlib
10 import time
11 import unittest
12 from random import randint, shuffle, getrandbits
13 from socket import AF_INET, AF_INET6, inet_ntop
14 from struct import pack, unpack
15
16 import scapy.compat
17 from scapy.layers.inet import UDP, IP
18 from scapy.layers.inet6 import IPv6
19 from scapy.layers.l2 import Ether, GRE
20 from scapy.packet import Raw
21
22 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
23     BFDDiagCode, BFDState, BFD_vpp_echo
24 from framework import tag_fixme_vpp_workers
25 from framework import VppTestCase, VppTestRunner, running_extended_tests
26 from framework import tag_run_solo
27 from util import ppp
28 from vpp_ip import DpoProto
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_lo_interface import VppLoInterface
31 from vpp_papi_provider import UnexpectedApiReturnValueError, \
32     CliFailedCommandError
33 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
34 from vpp_gre_interface import VppGreInterface
35 from vpp_papi import VppEnum
36
37 USEC_IN_SEC = 1000000
38
39
40 class AuthKeyFactory(object):
41     """Factory class for creating auth keys with unique conf key ID"""
42
43     def __init__(self):
44         self._conf_key_ids = {}
45
46     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
47         """ create a random key with unique conf key id """
48         conf_key_id = randint(0, 0xFFFFFFFF)
49         while conf_key_id in self._conf_key_ids:
50             conf_key_id = randint(0, 0xFFFFFFFF)
51         self._conf_key_ids[conf_key_id] = 1
52         key = scapy.compat.raw(
53             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
54         return VppBFDAuthKey(test=test, auth_type=auth_type,
55                              conf_key_id=conf_key_id, key=key)
56
57
58 class BFDAPITestCase(VppTestCase):
59     """Bidirectional Forwarding Detection (BFD) - API"""
60
61     pg0 = None
62     pg1 = None
63
64     @classmethod
65     def setUpClass(cls):
66         super(BFDAPITestCase, cls).setUpClass()
67         cls.vapi.cli("set log class bfd level debug")
68         try:
69             cls.create_pg_interfaces(range(2))
70             for i in cls.pg_interfaces:
71                 i.config_ip4()
72                 i.config_ip6()
73                 i.resolve_arp()
74
75         except Exception:
76             super(BFDAPITestCase, cls).tearDownClass()
77             raise
78
79     @classmethod
80     def tearDownClass(cls):
81         super(BFDAPITestCase, cls).tearDownClass()
82
83     def setUp(self):
84         super(BFDAPITestCase, self).setUp()
85         self.factory = AuthKeyFactory()
86
87     def test_add_bfd(self):
88         """ create a BFD session """
89         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
90         session.add_vpp_config()
91         self.logger.debug("Session state is %s", session.state)
92         session.remove_vpp_config()
93         session.add_vpp_config()
94         self.logger.debug("Session state is %s", session.state)
95         session.remove_vpp_config()
96
97     def test_double_add(self):
98         """ create the same BFD session twice (negative case) """
99         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
100         session.add_vpp_config()
101
102         with self.vapi.assert_negative_api_retval():
103             session.add_vpp_config()
104
105         session.remove_vpp_config()
106
107     def test_add_bfd6(self):
108         """ create IPv6 BFD session """
109         session = VppBFDUDPSession(
110             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
111         session.add_vpp_config()
112         self.logger.debug("Session state is %s", session.state)
113         session.remove_vpp_config()
114         session.add_vpp_config()
115         self.logger.debug("Session state is %s", session.state)
116         session.remove_vpp_config()
117
118     def test_mod_bfd(self):
119         """ modify BFD session parameters """
120         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
121                                    desired_min_tx=50000,
122                                    required_min_rx=10000,
123                                    detect_mult=1)
124         session.add_vpp_config()
125         s = session.get_bfd_udp_session_dump_entry()
126         self.assert_equal(session.desired_min_tx,
127                           s.desired_min_tx,
128                           "desired min transmit interval")
129         self.assert_equal(session.required_min_rx,
130                           s.required_min_rx,
131                           "required min receive interval")
132         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
133         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
134                                   required_min_rx=session.required_min_rx * 2,
135                                   detect_mult=session.detect_mult * 2)
136         s = session.get_bfd_udp_session_dump_entry()
137         self.assert_equal(session.desired_min_tx,
138                           s.desired_min_tx,
139                           "desired min transmit interval")
140         self.assert_equal(session.required_min_rx,
141                           s.required_min_rx,
142                           "required min receive interval")
143         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
144
145     def test_upd_bfd(self):
146         """ Create/Modify w/ Update BFD session parameters """
147         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
148                                    desired_min_tx=50000,
149                                    required_min_rx=10000,
150                                    detect_mult=1)
151         session.upd_vpp_config()
152         s = session.get_bfd_udp_session_dump_entry()
153         self.assert_equal(session.desired_min_tx,
154                           s.desired_min_tx,
155                           "desired min transmit interval")
156         self.assert_equal(session.required_min_rx,
157                           s.required_min_rx,
158                           "required min receive interval")
159
160         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
161         session.upd_vpp_config(desired_min_tx=session.desired_min_tx * 2,
162                                required_min_rx=session.required_min_rx * 2,
163                                detect_mult=session.detect_mult * 2)
164         s = session.get_bfd_udp_session_dump_entry()
165         self.assert_equal(session.desired_min_tx,
166                           s.desired_min_tx,
167                           "desired min transmit interval")
168         self.assert_equal(session.required_min_rx,
169                           s.required_min_rx,
170                           "required min receive interval")
171         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
172
173     def test_add_sha1_keys(self):
174         """ add SHA1 keys """
175         key_count = 10
176         keys = [self.factory.create_random_key(
177             self) for i in range(0, key_count)]
178         for key in keys:
179             self.assertFalse(key.query_vpp_config())
180         for key in keys:
181             key.add_vpp_config()
182         for key in keys:
183             self.assertTrue(key.query_vpp_config())
184         # remove randomly
185         indexes = list(range(key_count))
186         shuffle(indexes)
187         removed = []
188         for i in indexes:
189             key = keys[i]
190             key.remove_vpp_config()
191             removed.append(i)
192             for j in range(key_count):
193                 key = keys[j]
194                 if j in removed:
195                     self.assertFalse(key.query_vpp_config())
196                 else:
197                     self.assertTrue(key.query_vpp_config())
198         # should be removed now
199         for key in keys:
200             self.assertFalse(key.query_vpp_config())
201         # add back and remove again
202         for key in keys:
203             key.add_vpp_config()
204         for key in keys:
205             self.assertTrue(key.query_vpp_config())
206         for key in keys:
207             key.remove_vpp_config()
208         for key in keys:
209             self.assertFalse(key.query_vpp_config())
210
211     def test_add_bfd_sha1(self):
212         """ create a BFD session (SHA1) """
213         key = self.factory.create_random_key(self)
214         key.add_vpp_config()
215         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
216                                    sha1_key=key)
217         session.add_vpp_config()
218         self.logger.debug("Session state is %s", session.state)
219         session.remove_vpp_config()
220         session.add_vpp_config()
221         self.logger.debug("Session state is %s", session.state)
222         session.remove_vpp_config()
223
224     def test_double_add_sha1(self):
225         """ create the same BFD session twice (negative case) (SHA1) """
226         key = self.factory.create_random_key(self)
227         key.add_vpp_config()
228         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
229                                    sha1_key=key)
230         session.add_vpp_config()
231         with self.assertRaises(Exception):
232             session.add_vpp_config()
233
234     def test_add_auth_nonexistent_key(self):
235         """ create BFD session using non-existent SHA1 (negative case) """
236         session = VppBFDUDPSession(
237             self, self.pg0, self.pg0.remote_ip4,
238             sha1_key=self.factory.create_random_key(self))
239         with self.assertRaises(Exception):
240             session.add_vpp_config()
241
242     def test_shared_sha1_key(self):
243         """ single SHA1 key shared by multiple BFD sessions """
244         key = self.factory.create_random_key(self)
245         key.add_vpp_config()
246         sessions = [
247             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
248                              sha1_key=key),
249             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
250                              sha1_key=key, af=AF_INET6),
251             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
252                              sha1_key=key),
253             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
254                              sha1_key=key, af=AF_INET6)]
255         for s in sessions:
256             s.add_vpp_config()
257         removed = 0
258         for s in sessions:
259             e = key.get_bfd_auth_keys_dump_entry()
260             self.assert_equal(e.use_count, len(sessions) - removed,
261                               "Use count for shared key")
262             s.remove_vpp_config()
263             removed += 1
264         e = key.get_bfd_auth_keys_dump_entry()
265         self.assert_equal(e.use_count, len(sessions) - removed,
266                           "Use count for shared key")
267
268     def test_activate_auth(self):
269         """ activate SHA1 authentication """
270         key = self.factory.create_random_key(self)
271         key.add_vpp_config()
272         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
273         session.add_vpp_config()
274         session.activate_auth(key)
275
276     def test_deactivate_auth(self):
277         """ deactivate SHA1 authentication """
278         key = self.factory.create_random_key(self)
279         key.add_vpp_config()
280         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
281         session.add_vpp_config()
282         session.activate_auth(key)
283         session.deactivate_auth()
284
285     def test_change_key(self):
286         """ change SHA1 key """
287         key1 = self.factory.create_random_key(self)
288         key2 = self.factory.create_random_key(self)
289         while key2.conf_key_id == key1.conf_key_id:
290             key2 = self.factory.create_random_key(self)
291         key1.add_vpp_config()
292         key2.add_vpp_config()
293         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
294                                    sha1_key=key1)
295         session.add_vpp_config()
296         session.activate_auth(key2)
297
298     def test_set_del_udp_echo_source(self):
299         """ set/del udp echo source """
300         self.create_loopback_interfaces(1)
301         self.loopback0 = self.lo_interfaces[0]
302         self.loopback0.admin_up()
303         echo_source = self.vapi.bfd_udp_get_echo_source()
304         self.assertFalse(echo_source.is_set)
305         self.assertFalse(echo_source.have_usable_ip4)
306         self.assertFalse(echo_source.have_usable_ip6)
307
308         self.vapi.bfd_udp_set_echo_source(
309             sw_if_index=self.loopback0.sw_if_index)
310         echo_source = self.vapi.bfd_udp_get_echo_source()
311         self.assertTrue(echo_source.is_set)
312         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
313         self.assertFalse(echo_source.have_usable_ip4)
314         self.assertFalse(echo_source.have_usable_ip6)
315
316         self.loopback0.config_ip4()
317         echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
318             self.loopback0.local_ip4)) ^ 1).packed
319         echo_source = self.vapi.bfd_udp_get_echo_source()
320         self.assertTrue(echo_source.is_set)
321         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
322         self.assertTrue(echo_source.have_usable_ip4)
323         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
324         self.assertFalse(echo_source.have_usable_ip6)
325
326         self.loopback0.config_ip6()
327         echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
328             self.loopback0.local_ip6)) ^ 1).packed
329
330         echo_source = self.vapi.bfd_udp_get_echo_source()
331         self.assertTrue(echo_source.is_set)
332         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
333         self.assertTrue(echo_source.have_usable_ip4)
334         self.assertEqual(echo_source.ip4_addr.packed, echo_ip4)
335         self.assertTrue(echo_source.have_usable_ip6)
336         self.assertEqual(echo_source.ip6_addr.packed, echo_ip6)
337
338         self.vapi.bfd_udp_del_echo_source()
339         echo_source = self.vapi.bfd_udp_get_echo_source()
340         self.assertFalse(echo_source.is_set)
341         self.assertFalse(echo_source.have_usable_ip4)
342         self.assertFalse(echo_source.have_usable_ip6)
343
344
345 class BFDTestSession(object):
346     """ BFD session as seen from test framework side """
347
348     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
349                  bfd_key_id=None, our_seq_number=None,
350                  tunnel_header=None, phy_interface=None):
351         self.test = test
352         self.af = af
353         self.sha1_key = sha1_key
354         self.bfd_key_id = bfd_key_id
355         self.interface = interface
356         if phy_interface:
357             self.phy_interface = phy_interface
358         else:
359             self.phy_interface = self.interface
360         self.udp_sport = randint(49152, 65535)
361         if our_seq_number is None:
362             self.our_seq_number = randint(0, 40000000)
363         else:
364             self.our_seq_number = our_seq_number
365         self.vpp_seq_number = None
366         self.my_discriminator = 0
367         self.desired_min_tx = 300000
368         self.required_min_rx = 300000
369         self.required_min_echo_rx = None
370         self.detect_mult = detect_mult
371         self.diag = BFDDiagCode.no_diagnostic
372         self.your_discriminator = None
373         self.state = BFDState.down
374         self.auth_type = BFDAuthType.no_auth
375         self.tunnel_header = tunnel_header
376
377     def inc_seq_num(self):
378         """ increment sequence number, wrapping if needed """
379         if self.our_seq_number == 0xFFFFFFFF:
380             self.our_seq_number = 0
381         else:
382             self.our_seq_number += 1
383
384     def update(self, my_discriminator=None, your_discriminator=None,
385                desired_min_tx=None, required_min_rx=None,
386                required_min_echo_rx=None, detect_mult=None,
387                diag=None, state=None, auth_type=None):
388         """ update BFD parameters associated with session """
389         if my_discriminator is not None:
390             self.my_discriminator = my_discriminator
391         if your_discriminator is not None:
392             self.your_discriminator = your_discriminator
393         if required_min_rx is not None:
394             self.required_min_rx = required_min_rx
395         if required_min_echo_rx is not None:
396             self.required_min_echo_rx = required_min_echo_rx
397         if desired_min_tx is not None:
398             self.desired_min_tx = desired_min_tx
399         if detect_mult is not None:
400             self.detect_mult = detect_mult
401         if diag is not None:
402             self.diag = diag
403         if state is not None:
404             self.state = state
405         if auth_type is not None:
406             self.auth_type = auth_type
407
408     def fill_packet_fields(self, packet):
409         """ set packet fields with known values in packet """
410         bfd = packet[BFD]
411         if self.my_discriminator:
412             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
413                                    self.my_discriminator)
414             bfd.my_discriminator = self.my_discriminator
415         if self.your_discriminator:
416             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
417                                    self.your_discriminator)
418             bfd.your_discriminator = self.your_discriminator
419         if self.required_min_rx:
420             self.test.logger.debug(
421                 "BFD: setting packet.required_min_rx_interval=%s",
422                 self.required_min_rx)
423             bfd.required_min_rx_interval = self.required_min_rx
424         if self.required_min_echo_rx:
425             self.test.logger.debug(
426                 "BFD: setting packet.required_min_echo_rx=%s",
427                 self.required_min_echo_rx)
428             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
429         if self.desired_min_tx:
430             self.test.logger.debug(
431                 "BFD: setting packet.desired_min_tx_interval=%s",
432                 self.desired_min_tx)
433             bfd.desired_min_tx_interval = self.desired_min_tx
434         if self.detect_mult:
435             self.test.logger.debug(
436                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
437             bfd.detect_mult = self.detect_mult
438         if self.diag:
439             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
440             bfd.diag = self.diag
441         if self.state:
442             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
443             bfd.state = self.state
444         if self.auth_type:
445             # this is used by a negative test-case
446             self.test.logger.debug("BFD: setting packet.auth_type=%s",
447                                    self.auth_type)
448             bfd.auth_type = self.auth_type
449
450     def create_packet(self):
451         """ create a BFD packet, reflecting the current state of session """
452         if self.sha1_key:
453             bfd = BFD(flags="A")
454             bfd.auth_type = self.sha1_key.auth_type
455             bfd.auth_len = BFD.sha1_auth_len
456             bfd.auth_key_id = self.bfd_key_id
457             bfd.auth_seq_num = self.our_seq_number
458             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
459         else:
460             bfd = BFD()
461         packet = Ether(src=self.phy_interface.remote_mac,
462                        dst=self.phy_interface.local_mac)
463         if self.tunnel_header:
464             packet = packet / self.tunnel_header
465         if self.af == AF_INET6:
466             packet = (packet /
467                       IPv6(src=self.interface.remote_ip6,
468                            dst=self.interface.local_ip6,
469                            hlim=255) /
470                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
471                       bfd)
472         else:
473             packet = (packet /
474                       IP(src=self.interface.remote_ip4,
475                          dst=self.interface.local_ip4,
476                          ttl=255) /
477                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
478                       bfd)
479         self.test.logger.debug("BFD: Creating packet")
480         self.fill_packet_fields(packet)
481         if self.sha1_key:
482             hash_material = scapy.compat.raw(
483                 packet[BFD])[:32] + self.sha1_key.key + \
484                 b"\0" * (20 - len(self.sha1_key.key))
485             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
486                                    hashlib.sha1(hash_material).hexdigest())
487             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
488         return packet
489
490     def send_packet(self, packet=None, interface=None):
491         """ send packet on interface, creating the packet if needed """
492         if packet is None:
493             packet = self.create_packet()
494         if interface is None:
495             interface = self.phy_interface
496         self.test.logger.debug(ppp("Sending packet:", packet))
497         interface.add_stream(packet)
498         self.test.pg_start()
499
500     def verify_sha1_auth(self, packet):
501         """ Verify correctness of authentication in BFD layer. """
502         bfd = packet[BFD]
503         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
504         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
505                                BFDAuthType)
506         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
507         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
508         if self.vpp_seq_number is None:
509             self.vpp_seq_number = bfd.auth_seq_num
510             self.test.logger.debug("Received initial sequence number: %s" %
511                                    self.vpp_seq_number)
512         else:
513             recvd_seq_num = bfd.auth_seq_num
514             self.test.logger.debug("Received followup sequence number: %s" %
515                                    recvd_seq_num)
516             if self.vpp_seq_number < 0xffffffff:
517                 if self.sha1_key.auth_type == \
518                         BFDAuthType.meticulous_keyed_sha1:
519                     self.test.assert_equal(recvd_seq_num,
520                                            self.vpp_seq_number + 1,
521                                            "BFD sequence number")
522                 else:
523                     self.test.assert_in_range(recvd_seq_num,
524                                               self.vpp_seq_number,
525                                               self.vpp_seq_number + 1,
526                                               "BFD sequence number")
527             else:
528                 if self.sha1_key.auth_type == \
529                         BFDAuthType.meticulous_keyed_sha1:
530                     self.test.assert_equal(recvd_seq_num, 0,
531                                            "BFD sequence number")
532                 else:
533                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
534                                        "BFD sequence number not one of "
535                                        "(%s, 0)" % self.vpp_seq_number)
536             self.vpp_seq_number = recvd_seq_num
537         # last 20 bytes represent the hash - so replace them with the key,
538         # pad the result with zeros and hash the result
539         hash_material = bfd.original[:-20] + self.sha1_key.key + \
540             b"\0" * (20 - len(self.sha1_key.key))
541         expected_hash = hashlib.sha1(hash_material).hexdigest()
542         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
543                                expected_hash.encode(), "Auth key hash")
544
545     def verify_bfd(self, packet):
546         """ Verify correctness of BFD layer. """
547         bfd = packet[BFD]
548         self.test.assert_equal(bfd.version, 1, "BFD version")
549         self.test.assert_equal(bfd.your_discriminator,
550                                self.my_discriminator,
551                                "BFD - your discriminator")
552         if self.sha1_key:
553             self.verify_sha1_auth(packet)
554
555
556 def bfd_session_up(test):
557     """ Bring BFD session up """
558     test.logger.info("BFD: Waiting for slow hello")
559     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
560     old_offset = None
561     if hasattr(test, 'vpp_clock_offset'):
562         old_offset = test.vpp_clock_offset
563     test.vpp_clock_offset = time.time() - float(p.time)
564     test.logger.debug("BFD: Calculated vpp clock offset: %s",
565                       test.vpp_clock_offset)
566     if old_offset:
567         test.assertAlmostEqual(
568             old_offset, test.vpp_clock_offset, delta=0.5,
569             msg="vpp clock offset not stable (new: %s, old: %s)" %
570             (test.vpp_clock_offset, old_offset))
571     test.logger.info("BFD: Sending Init")
572     test.test_session.update(my_discriminator=randint(0, 40000000),
573                              your_discriminator=p[BFD].my_discriminator,
574                              state=BFDState.init)
575     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
576             BFDAuthType.meticulous_keyed_sha1:
577         test.test_session.inc_seq_num()
578     test.test_session.send_packet()
579     test.logger.info("BFD: Waiting for event")
580     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
581     verify_event(test, e, expected_state=BFDState.up)
582     test.logger.info("BFD: Session is Up")
583     test.test_session.update(state=BFDState.up)
584     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
585             BFDAuthType.meticulous_keyed_sha1:
586         test.test_session.inc_seq_num()
587     test.test_session.send_packet()
588     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
589
590
591 def bfd_session_down(test):
592     """ Bring BFD session down """
593     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
594     test.test_session.update(state=BFDState.down)
595     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
596             BFDAuthType.meticulous_keyed_sha1:
597         test.test_session.inc_seq_num()
598     test.test_session.send_packet()
599     test.logger.info("BFD: Waiting for event")
600     e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
601     verify_event(test, e, expected_state=BFDState.down)
602     test.logger.info("BFD: Session is Down")
603     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
604
605
606 def verify_bfd_session_config(test, session, state=None):
607     dump = session.get_bfd_udp_session_dump_entry()
608     test.assertIsNotNone(dump)
609     # since dump is not none, we have verified that sw_if_index and addresses
610     # are valid (in get_bfd_udp_session_dump_entry)
611     if state:
612         test.assert_equal(dump.state, state, "session state")
613     test.assert_equal(dump.required_min_rx, session.required_min_rx,
614                       "required min rx interval")
615     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
616                       "desired min tx interval")
617     test.assert_equal(dump.detect_mult, session.detect_mult,
618                       "detect multiplier")
619     if session.sha1_key is None:
620         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
621     else:
622         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
623         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
624                           "bfd key id")
625         test.assert_equal(dump.conf_key_id,
626                           session.sha1_key.conf_key_id,
627                           "config key id")
628
629
630 def verify_ip(test, packet):
631     """ Verify correctness of IP layer. """
632     if test.vpp_session.af == AF_INET6:
633         ip = packet[IPv6]
634         local_ip = test.vpp_session.interface.local_ip6
635         remote_ip = test.vpp_session.interface.remote_ip6
636         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
637     else:
638         ip = packet[IP]
639         local_ip = test.vpp_session.interface.local_ip4
640         remote_ip = test.vpp_session.interface.remote_ip4
641         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
642     test.assert_equal(ip.src, local_ip, "IP source address")
643     test.assert_equal(ip.dst, remote_ip, "IP destination address")
644
645
646 def verify_udp(test, packet):
647     """ Verify correctness of UDP layer. """
648     udp = packet[UDP]
649     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
650     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
651                          "UDP source port")
652
653
654 def verify_event(test, event, expected_state):
655     """ Verify correctness of event values. """
656     e = event
657     test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
658     test.assert_equal(e.sw_if_index,
659                       test.vpp_session.interface.sw_if_index,
660                       "BFD interface index")
661
662     test.assert_equal(str(e.local_addr), test.vpp_session.local_addr,
663                       "Local IPv6 address")
664     test.assert_equal(str(e.peer_addr), test.vpp_session.peer_addr,
665                       "Peer IPv6 address")
666     test.assert_equal(e.state, expected_state, BFDState)
667
668
669 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
670     """ wait for BFD packet and verify its correctness
671
672     :param timeout: how long to wait
673     :param pcap_time_min: ignore packets with pcap timestamp lower than this
674
675     :returns: tuple (packet, time spent waiting for packet)
676     """
677     test.logger.info("BFD: Waiting for BFD packet")
678     deadline = time.time() + timeout
679     counter = 0
680     while True:
681         counter += 1
682         # sanity check
683         test.assert_in_range(counter, 0, 100, "number of packets ignored")
684         time_left = deadline - time.time()
685         if time_left < 0:
686             raise CaptureTimeoutError("Packet did not arrive within timeout")
687         p = test.pg0.wait_for_packet(timeout=time_left)
688         test.logger.debug(ppp("BFD: Got packet:", p))
689         if pcap_time_min is not None and p.time < pcap_time_min:
690             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
691                                   "pcap time min %s):" %
692                                   (p.time, pcap_time_min), p))
693         else:
694             break
695     if is_tunnel:
696         # strip an IP layer and move to the next
697         p = p[IP].payload
698
699     bfd = p[BFD]
700     if bfd is None:
701         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
702     if bfd.payload:
703         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
704     verify_ip(test, p)
705     verify_udp(test, p)
706     test.test_session.verify_bfd(p)
707     return p
708
709
710 @tag_run_solo
711 class BFD4TestCase(VppTestCase):
712     """Bidirectional Forwarding Detection (BFD)"""
713
714     pg0 = None
715     vpp_clock_offset = None
716     vpp_session = None
717     test_session = None
718
719     @classmethod
720     def setUpClass(cls):
721         super(BFD4TestCase, cls).setUpClass()
722         cls.vapi.cli("set log class bfd level debug")
723         try:
724             cls.create_pg_interfaces([0])
725             cls.create_loopback_interfaces(1)
726             cls.loopback0 = cls.lo_interfaces[0]
727             cls.loopback0.config_ip4()
728             cls.loopback0.admin_up()
729             cls.pg0.config_ip4()
730             cls.pg0.configure_ipv4_neighbors()
731             cls.pg0.admin_up()
732             cls.pg0.resolve_arp()
733
734         except Exception:
735             super(BFD4TestCase, cls).tearDownClass()
736             raise
737
738     @classmethod
739     def tearDownClass(cls):
740         super(BFD4TestCase, cls).tearDownClass()
741
742     def setUp(self):
743         super(BFD4TestCase, self).setUp()
744         self.factory = AuthKeyFactory()
745         self.vapi.want_bfd_events()
746         self.pg0.enable_capture()
747         try:
748             self.vpp_session = VppBFDUDPSession(self, self.pg0,
749                                                 self.pg0.remote_ip4)
750             self.vpp_session.add_vpp_config()
751             self.vpp_session.admin_up()
752             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
753         except BaseException:
754             self.vapi.want_bfd_events(enable_disable=0)
755             raise
756
757     def tearDown(self):
758         if not self.vpp_dead:
759             self.vapi.want_bfd_events(enable_disable=0)
760         self.vapi.collect_events()  # clear the event queue
761         super(BFD4TestCase, self).tearDown()
762
763     def test_session_up(self):
764         """ bring BFD session up """
765         bfd_session_up(self)
766
767     def test_session_up_by_ip(self):
768         """ bring BFD session up - first frame looked up by address pair """
769         self.logger.info("BFD: Sending Slow control frame")
770         self.test_session.update(my_discriminator=randint(0, 40000000))
771         self.test_session.send_packet()
772         self.pg0.enable_capture()
773         p = self.pg0.wait_for_packet(1)
774         self.assert_equal(p[BFD].your_discriminator,
775                           self.test_session.my_discriminator,
776                           "BFD - your discriminator")
777         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
778         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
779                                  state=BFDState.up)
780         self.logger.info("BFD: Waiting for event")
781         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
782         verify_event(self, e, expected_state=BFDState.init)
783         self.logger.info("BFD: Sending Up")
784         self.test_session.send_packet()
785         self.logger.info("BFD: Waiting for event")
786         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
787         verify_event(self, e, expected_state=BFDState.up)
788         self.logger.info("BFD: Session is Up")
789         self.test_session.update(state=BFDState.up)
790         self.test_session.send_packet()
791         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
792
793     def test_session_down(self):
794         """ bring BFD session down """
795         bfd_session_up(self)
796         bfd_session_down(self)
797
798     def test_hold_up(self):
799         """ hold BFD session up """
800         bfd_session_up(self)
801         for dummy in range(self.test_session.detect_mult * 2):
802             wait_for_bfd_packet(self)
803             self.test_session.send_packet()
804         self.assert_equal(len(self.vapi.collect_events()), 0,
805                           "number of bfd events")
806
807     def test_slow_timer(self):
808         """ verify slow periodic control frames while session down """
809         packet_count = 3
810         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
811         prev_packet = wait_for_bfd_packet(self, 2)
812         for dummy in range(packet_count):
813             next_packet = wait_for_bfd_packet(self, 2)
814             time_diff = next_packet.time - prev_packet.time
815             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
816             # to work around timing issues
817             self.assert_in_range(
818                 time_diff, 0.70, 1.05, "time between slow packets")
819             prev_packet = next_packet
820
821     def test_zero_remote_min_rx(self):
822         """ no packets when zero remote required min rx interval """
823         bfd_session_up(self)
824         self.test_session.update(required_min_rx=0)
825         self.test_session.send_packet()
826         for dummy in range(self.test_session.detect_mult):
827             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
828                        "sleep before transmitting bfd packet")
829             self.test_session.send_packet()
830             try:
831                 p = wait_for_bfd_packet(self, timeout=0)
832                 self.logger.error(ppp("Received unexpected packet:", p))
833             except CaptureTimeoutError:
834                 pass
835         self.assert_equal(
836             len(self.vapi.collect_events()), 0, "number of bfd events")
837         self.test_session.update(required_min_rx=300000)
838         for dummy in range(3):
839             self.test_session.send_packet()
840             wait_for_bfd_packet(
841                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
842         self.assert_equal(
843             len(self.vapi.collect_events()), 0, "number of bfd events")
844
845     def test_conn_down(self):
846         """ verify session goes down after inactivity """
847         bfd_session_up(self)
848         detection_time = self.test_session.detect_mult *\
849             self.vpp_session.required_min_rx / USEC_IN_SEC
850         self.sleep(detection_time, "waiting for BFD session time-out")
851         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
852         verify_event(self, e, expected_state=BFDState.down)
853
854     def test_peer_discr_reset_sess_down(self):
855         """ peer discriminator reset after session goes down """
856         bfd_session_up(self)
857         detection_time = self.test_session.detect_mult *\
858             self.vpp_session.required_min_rx / USEC_IN_SEC
859         self.sleep(detection_time, "waiting for BFD session time-out")
860         self.test_session.my_discriminator = 0
861         wait_for_bfd_packet(self,
862                             pcap_time_min=time.time() - self.vpp_clock_offset)
863
864     def test_large_required_min_rx(self):
865         """ large remote required min rx interval """
866         bfd_session_up(self)
867         p = wait_for_bfd_packet(self)
868         interval = 3000000
869         self.test_session.update(required_min_rx=interval)
870         self.test_session.send_packet()
871         time_mark = time.time()
872         count = 0
873         # busy wait here, trying to collect a packet or event, vpp is not
874         # allowed to send packets and the session will timeout first - so the
875         # Up->Down event must arrive before any packets do
876         while time.time() < time_mark + interval / USEC_IN_SEC:
877             try:
878                 p = wait_for_bfd_packet(self, timeout=0)
879                 # if vpp managed to send a packet before we did the session
880                 # session update, then that's fine, ignore it
881                 if p.time < time_mark - self.vpp_clock_offset:
882                     continue
883                 self.logger.error(ppp("Received unexpected packet:", p))
884                 count += 1
885             except CaptureTimeoutError:
886                 pass
887             events = self.vapi.collect_events()
888             if len(events) > 0:
889                 verify_event(self, events[0], BFDState.down)
890                 break
891         self.assert_equal(count, 0, "number of packets received")
892
893     def test_immediate_remote_min_rx_reduction(self):
894         """ immediately honor remote required min rx reduction """
895         self.vpp_session.remove_vpp_config()
896         self.vpp_session = VppBFDUDPSession(
897             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
898         self.pg0.enable_capture()
899         self.vpp_session.add_vpp_config()
900         self.test_session.update(desired_min_tx=1000000,
901                                  required_min_rx=1000000)
902         bfd_session_up(self)
903         reference_packet = wait_for_bfd_packet(self)
904         time_mark = time.time()
905         interval = 300000
906         self.test_session.update(required_min_rx=interval)
907         self.test_session.send_packet()
908         extra_time = time.time() - time_mark
909         p = wait_for_bfd_packet(self)
910         # first packet is allowed to be late by time we spent doing the update
911         # calculated in extra_time
912         self.assert_in_range(p.time - reference_packet.time,
913                              .95 * 0.75 * interval / USEC_IN_SEC,
914                              1.05 * interval / USEC_IN_SEC + extra_time,
915                              "time between BFD packets")
916         reference_packet = p
917         for dummy in range(3):
918             p = wait_for_bfd_packet(self)
919             diff = p.time - reference_packet.time
920             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
921                                  1.05 * interval / USEC_IN_SEC,
922                                  "time between BFD packets")
923             reference_packet = p
924
925     def test_modify_req_min_rx_double(self):
926         """ modify session - double required min rx """
927         bfd_session_up(self)
928         p = wait_for_bfd_packet(self)
929         self.test_session.update(desired_min_tx=10000,
930                                  required_min_rx=10000)
931         self.test_session.send_packet()
932         # double required min rx
933         self.vpp_session.modify_parameters(
934             required_min_rx=2 * self.vpp_session.required_min_rx)
935         p = wait_for_bfd_packet(
936             self, pcap_time_min=time.time() - self.vpp_clock_offset)
937         # poll bit needs to be set
938         self.assertIn("P", p.sprintf("%BFD.flags%"),
939                       "Poll bit not set in BFD packet")
940         # finish poll sequence with final packet
941         final = self.test_session.create_packet()
942         final[BFD].flags = "F"
943         timeout = self.test_session.detect_mult * \
944             max(self.test_session.desired_min_tx,
945                 self.vpp_session.required_min_rx) / USEC_IN_SEC
946         self.test_session.send_packet(final)
947         time_mark = time.time()
948         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
949         verify_event(self, e, expected_state=BFDState.down)
950         time_to_event = time.time() - time_mark
951         self.assert_in_range(time_to_event, .9 * timeout,
952                              1.1 * timeout, "session timeout")
953
954     def test_modify_req_min_rx_halve(self):
955         """ modify session - halve required min rx """
956         self.vpp_session.modify_parameters(
957             required_min_rx=2 * self.vpp_session.required_min_rx)
958         bfd_session_up(self)
959         p = wait_for_bfd_packet(self)
960         self.test_session.update(desired_min_tx=10000,
961                                  required_min_rx=10000)
962         self.test_session.send_packet()
963         p = wait_for_bfd_packet(
964             self, pcap_time_min=time.time() - self.vpp_clock_offset)
965         # halve required min rx
966         old_required_min_rx = self.vpp_session.required_min_rx
967         self.vpp_session.modify_parameters(
968             required_min_rx=self.vpp_session.required_min_rx // 2)
969         # now we wait 0.8*3*old-req-min-rx and the session should still be up
970         self.sleep(0.8 * self.vpp_session.detect_mult *
971                    old_required_min_rx / USEC_IN_SEC,
972                    "wait before finishing poll sequence")
973         self.assert_equal(len(self.vapi.collect_events()), 0,
974                           "number of bfd events")
975         p = wait_for_bfd_packet(self)
976         # poll bit needs to be set
977         self.assertIn("P", p.sprintf("%BFD.flags%"),
978                       "Poll bit not set in BFD packet")
979         # finish poll sequence with final packet
980         final = self.test_session.create_packet()
981         final[BFD].flags = "F"
982         self.test_session.send_packet(final)
983         # now the session should time out under new conditions
984         detection_time = self.test_session.detect_mult *\
985             self.vpp_session.required_min_rx / USEC_IN_SEC
986         before = time.time()
987         e = self.vapi.wait_for_event(
988             2 * detection_time, "bfd_udp_session_event")
989         after = time.time()
990         self.assert_in_range(after - before,
991                              0.9 * detection_time,
992                              1.1 * detection_time,
993                              "time before bfd session goes down")
994         verify_event(self, e, expected_state=BFDState.down)
995
996     def test_modify_detect_mult(self):
997         """ modify detect multiplier """
998         bfd_session_up(self)
999         p = wait_for_bfd_packet(self)
1000         self.vpp_session.modify_parameters(detect_mult=1)
1001         p = wait_for_bfd_packet(
1002             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1003         self.assert_equal(self.vpp_session.detect_mult,
1004                           p[BFD].detect_mult,
1005                           "detect mult")
1006         # poll bit must not be set
1007         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1008                          "Poll bit not set in BFD packet")
1009         self.vpp_session.modify_parameters(detect_mult=10)
1010         p = wait_for_bfd_packet(
1011             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1012         self.assert_equal(self.vpp_session.detect_mult,
1013                           p[BFD].detect_mult,
1014                           "detect mult")
1015         # poll bit must not be set
1016         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1017                          "Poll bit not set in BFD packet")
1018
1019     def test_queued_poll(self):
1020         """ test poll sequence queueing """
1021         bfd_session_up(self)
1022         p = wait_for_bfd_packet(self)
1023         self.vpp_session.modify_parameters(
1024             required_min_rx=2 * self.vpp_session.required_min_rx)
1025         p = wait_for_bfd_packet(self)
1026         poll_sequence_start = time.time()
1027         poll_sequence_length_min = 0.5
1028         send_final_after = time.time() + poll_sequence_length_min
1029         # poll bit needs to be set
1030         self.assertIn("P", p.sprintf("%BFD.flags%"),
1031                       "Poll bit not set in BFD packet")
1032         self.assert_equal(p[BFD].required_min_rx_interval,
1033                           self.vpp_session.required_min_rx,
1034                           "BFD required min rx interval")
1035         self.vpp_session.modify_parameters(
1036             required_min_rx=2 * self.vpp_session.required_min_rx)
1037         # 2nd poll sequence should be queued now
1038         # don't send the reply back yet, wait for some time to emulate
1039         # longer round-trip time
1040         packet_count = 0
1041         while time.time() < send_final_after:
1042             self.test_session.send_packet()
1043             p = wait_for_bfd_packet(self)
1044             self.assert_equal(len(self.vapi.collect_events()), 0,
1045                               "number of bfd events")
1046             self.assert_equal(p[BFD].required_min_rx_interval,
1047                               self.vpp_session.required_min_rx,
1048                               "BFD required min rx interval")
1049             packet_count += 1
1050             # poll bit must be set
1051             self.assertIn("P", p.sprintf("%BFD.flags%"),
1052                           "Poll bit not set in BFD packet")
1053         final = self.test_session.create_packet()
1054         final[BFD].flags = "F"
1055         self.test_session.send_packet(final)
1056         # finish 1st with final
1057         poll_sequence_length = time.time() - poll_sequence_start
1058         # vpp must wait for some time before starting new poll sequence
1059         poll_no_2_started = False
1060         for dummy in range(2 * packet_count):
1061             p = wait_for_bfd_packet(self)
1062             self.assert_equal(len(self.vapi.collect_events()), 0,
1063                               "number of bfd events")
1064             if "P" in p.sprintf("%BFD.flags%"):
1065                 poll_no_2_started = True
1066                 if time.time() < poll_sequence_start + poll_sequence_length:
1067                     raise Exception("VPP started 2nd poll sequence too soon")
1068                 final = self.test_session.create_packet()
1069                 final[BFD].flags = "F"
1070                 self.test_session.send_packet(final)
1071                 break
1072             else:
1073                 self.test_session.send_packet()
1074         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1075         # finish 2nd with final
1076         final = self.test_session.create_packet()
1077         final[BFD].flags = "F"
1078         self.test_session.send_packet(final)
1079         p = wait_for_bfd_packet(self)
1080         # poll bit must not be set
1081         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1082                          "Poll bit set in BFD packet")
1083
1084     # returning inconsistent results requiring retries in per-patch tests
1085     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1086     def test_poll_response(self):
1087         """ test correct response to control frame with poll bit set """
1088         bfd_session_up(self)
1089         poll = self.test_session.create_packet()
1090         poll[BFD].flags = "P"
1091         self.test_session.send_packet(poll)
1092         final = wait_for_bfd_packet(
1093             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1094         self.assertIn("F", final.sprintf("%BFD.flags%"))
1095
1096     def test_no_periodic_if_remote_demand(self):
1097         """ no periodic frames outside poll sequence if remote demand set """
1098         bfd_session_up(self)
1099         demand = self.test_session.create_packet()
1100         demand[BFD].flags = "D"
1101         self.test_session.send_packet(demand)
1102         transmit_time = 0.9 \
1103             * max(self.vpp_session.required_min_rx,
1104                   self.test_session.desired_min_tx) \
1105             / USEC_IN_SEC
1106         count = 0
1107         for dummy in range(self.test_session.detect_mult * 2):
1108             self.sleep(transmit_time)
1109             self.test_session.send_packet(demand)
1110             try:
1111                 p = wait_for_bfd_packet(self, timeout=0)
1112                 self.logger.error(ppp("Received unexpected packet:", p))
1113                 count += 1
1114             except CaptureTimeoutError:
1115                 pass
1116         events = self.vapi.collect_events()
1117         for e in events:
1118             self.logger.error("Received unexpected event: %s", e)
1119         self.assert_equal(count, 0, "number of packets received")
1120         self.assert_equal(len(events), 0, "number of events received")
1121
1122     def test_echo_looped_back(self):
1123         """ echo packets looped back """
1124         # don't need a session in this case..
1125         self.vpp_session.remove_vpp_config()
1126         self.pg0.enable_capture()
1127         echo_packet_count = 10
1128         # random source port low enough to increment a few times..
1129         udp_sport_tx = randint(1, 50000)
1130         udp_sport_rx = udp_sport_tx
1131         echo_packet = (Ether(src=self.pg0.remote_mac,
1132                              dst=self.pg0.local_mac) /
1133                        IP(src=self.pg0.remote_ip4,
1134                           dst=self.pg0.remote_ip4) /
1135                        UDP(dport=BFD.udp_dport_echo) /
1136                        Raw("this should be looped back"))
1137         for dummy in range(echo_packet_count):
1138             self.sleep(.01, "delay between echo packets")
1139             echo_packet[UDP].sport = udp_sport_tx
1140             udp_sport_tx += 1
1141             self.logger.debug(ppp("Sending packet:", echo_packet))
1142             self.pg0.add_stream(echo_packet)
1143             self.pg_start()
1144         for dummy in range(echo_packet_count):
1145             p = self.pg0.wait_for_packet(1)
1146             self.logger.debug(ppp("Got packet:", p))
1147             ether = p[Ether]
1148             self.assert_equal(self.pg0.remote_mac,
1149                               ether.dst, "Destination MAC")
1150             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1151             ip = p[IP]
1152             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1153             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1154             udp = p[UDP]
1155             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1156                               "UDP destination port")
1157             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1158             udp_sport_rx += 1
1159             # need to compare the hex payload here, otherwise BFD_vpp_echo
1160             # gets in way
1161             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1162                              scapy.compat.raw(echo_packet[UDP].payload),
1163                              "Received packet is not the echo packet sent")
1164         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1165                           "ECHO packet identifier for test purposes)")
1166
1167     def test_echo(self):
1168         """ echo function """
1169         bfd_session_up(self)
1170         self.test_session.update(required_min_echo_rx=150000)
1171         self.test_session.send_packet()
1172         detection_time = self.test_session.detect_mult *\
1173             self.vpp_session.required_min_rx / USEC_IN_SEC
1174         # echo shouldn't work without echo source set
1175         for dummy in range(10):
1176             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1177             self.sleep(sleep, "delay before sending bfd packet")
1178             self.test_session.send_packet()
1179         p = wait_for_bfd_packet(
1180             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1181         self.assert_equal(p[BFD].required_min_rx_interval,
1182                           self.vpp_session.required_min_rx,
1183                           "BFD required min rx interval")
1184         self.test_session.send_packet()
1185         self.vapi.bfd_udp_set_echo_source(
1186             sw_if_index=self.loopback0.sw_if_index)
1187         echo_seen = False
1188         # should be turned on - loopback echo packets
1189         for dummy in range(3):
1190             loop_until = time.time() + 0.75 * detection_time
1191             while time.time() < loop_until:
1192                 p = self.pg0.wait_for_packet(1)
1193                 self.logger.debug(ppp("Got packet:", p))
1194                 if p[UDP].dport == BFD.udp_dport_echo:
1195                     self.assert_equal(
1196                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1197                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1198                                         "BFD ECHO src IP equal to loopback IP")
1199                     self.logger.debug(ppp("Looping back packet:", p))
1200                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1201                                       "ECHO packet destination MAC address")
1202                     p[Ether].dst = self.pg0.local_mac
1203                     self.pg0.add_stream(p)
1204                     self.pg_start()
1205                     echo_seen = True
1206                 elif p.haslayer(BFD):
1207                     if echo_seen:
1208                         self.assertGreaterEqual(
1209                             p[BFD].required_min_rx_interval,
1210                             1000000)
1211                     if "P" in p.sprintf("%BFD.flags%"):
1212                         final = self.test_session.create_packet()
1213                         final[BFD].flags = "F"
1214                         self.test_session.send_packet(final)
1215                 else:
1216                     raise Exception(ppp("Received unknown packet:", p))
1217
1218                 self.assert_equal(len(self.vapi.collect_events()), 0,
1219                                   "number of bfd events")
1220             self.test_session.send_packet()
1221         self.assertTrue(echo_seen, "No echo packets received")
1222
1223     def test_echo_fail(self):
1224         """ session goes down if echo function fails """
1225         bfd_session_up(self)
1226         self.test_session.update(required_min_echo_rx=150000)
1227         self.test_session.send_packet()
1228         detection_time = self.test_session.detect_mult *\
1229             self.vpp_session.required_min_rx / USEC_IN_SEC
1230         self.vapi.bfd_udp_set_echo_source(
1231             sw_if_index=self.loopback0.sw_if_index)
1232         # echo function should be used now, but we will drop the echo packets
1233         verified_diag = False
1234         for dummy in range(3):
1235             loop_until = time.time() + 0.75 * detection_time
1236             while time.time() < loop_until:
1237                 p = self.pg0.wait_for_packet(1)
1238                 self.logger.debug(ppp("Got packet:", p))
1239                 if p[UDP].dport == BFD.udp_dport_echo:
1240                     # dropped
1241                     pass
1242                 elif p.haslayer(BFD):
1243                     if "P" in p.sprintf("%BFD.flags%"):
1244                         self.assertGreaterEqual(
1245                             p[BFD].required_min_rx_interval,
1246                             1000000)
1247                         final = self.test_session.create_packet()
1248                         final[BFD].flags = "F"
1249                         self.test_session.send_packet(final)
1250                     if p[BFD].state == BFDState.down:
1251                         self.assert_equal(p[BFD].diag,
1252                                           BFDDiagCode.echo_function_failed,
1253                                           BFDDiagCode)
1254                         verified_diag = True
1255                 else:
1256                     raise Exception(ppp("Received unknown packet:", p))
1257             self.test_session.send_packet()
1258         events = self.vapi.collect_events()
1259         self.assert_equal(len(events), 1, "number of bfd events")
1260         self.assert_equal(events[0].state, BFDState.down, BFDState)
1261         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1262
1263     def test_echo_stop(self):
1264         """ echo function stops if peer sets required min echo rx zero """
1265         bfd_session_up(self)
1266         self.test_session.update(required_min_echo_rx=150000)
1267         self.test_session.send_packet()
1268         self.vapi.bfd_udp_set_echo_source(
1269             sw_if_index=self.loopback0.sw_if_index)
1270         # wait for first echo packet
1271         while True:
1272             p = self.pg0.wait_for_packet(1)
1273             self.logger.debug(ppp("Got packet:", p))
1274             if p[UDP].dport == BFD.udp_dport_echo:
1275                 self.logger.debug(ppp("Looping back packet:", p))
1276                 p[Ether].dst = self.pg0.local_mac
1277                 self.pg0.add_stream(p)
1278                 self.pg_start()
1279                 break
1280             elif p.haslayer(BFD):
1281                 # ignore BFD
1282                 pass
1283             else:
1284                 raise Exception(ppp("Received unknown packet:", p))
1285         self.test_session.update(required_min_echo_rx=0)
1286         self.test_session.send_packet()
1287         # echo packets shouldn't arrive anymore
1288         for dummy in range(5):
1289             wait_for_bfd_packet(
1290                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1291             self.test_session.send_packet()
1292             events = self.vapi.collect_events()
1293             self.assert_equal(len(events), 0, "number of bfd events")
1294
1295     def test_echo_source_removed(self):
1296         """ echo function stops if echo source is removed """
1297         bfd_session_up(self)
1298         self.test_session.update(required_min_echo_rx=150000)
1299         self.test_session.send_packet()
1300         self.vapi.bfd_udp_set_echo_source(
1301             sw_if_index=self.loopback0.sw_if_index)
1302         # wait for first echo packet
1303         while True:
1304             p = self.pg0.wait_for_packet(1)
1305             self.logger.debug(ppp("Got packet:", p))
1306             if p[UDP].dport == BFD.udp_dport_echo:
1307                 self.logger.debug(ppp("Looping back packet:", p))
1308                 p[Ether].dst = self.pg0.local_mac
1309                 self.pg0.add_stream(p)
1310                 self.pg_start()
1311                 break
1312             elif p.haslayer(BFD):
1313                 # ignore BFD
1314                 pass
1315             else:
1316                 raise Exception(ppp("Received unknown packet:", p))
1317         self.vapi.bfd_udp_del_echo_source()
1318         self.test_session.send_packet()
1319         # echo packets shouldn't arrive anymore
1320         for dummy in range(5):
1321             wait_for_bfd_packet(
1322                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1323             self.test_session.send_packet()
1324             events = self.vapi.collect_events()
1325             self.assert_equal(len(events), 0, "number of bfd events")
1326
1327     def test_stale_echo(self):
1328         """ stale echo packets don't keep a session up """
1329         bfd_session_up(self)
1330         self.test_session.update(required_min_echo_rx=150000)
1331         self.vapi.bfd_udp_set_echo_source(
1332             sw_if_index=self.loopback0.sw_if_index)
1333         self.test_session.send_packet()
1334         # should be turned on - loopback echo packets
1335         echo_packet = None
1336         timeout_at = None
1337         timeout_ok = False
1338         for dummy in range(10 * self.vpp_session.detect_mult):
1339             p = self.pg0.wait_for_packet(1)
1340             if p[UDP].dport == BFD.udp_dport_echo:
1341                 if echo_packet is None:
1342                     self.logger.debug(ppp("Got first echo packet:", p))
1343                     echo_packet = p
1344                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1345                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1346                 else:
1347                     self.logger.debug(ppp("Got followup echo packet:", p))
1348                 self.logger.debug(ppp("Looping back first echo packet:", p))
1349                 echo_packet[Ether].dst = self.pg0.local_mac
1350                 self.pg0.add_stream(echo_packet)
1351                 self.pg_start()
1352             elif p.haslayer(BFD):
1353                 self.logger.debug(ppp("Got packet:", p))
1354                 if "P" in p.sprintf("%BFD.flags%"):
1355                     final = self.test_session.create_packet()
1356                     final[BFD].flags = "F"
1357                     self.test_session.send_packet(final)
1358                 if p[BFD].state == BFDState.down:
1359                     self.assertIsNotNone(
1360                         timeout_at,
1361                         "Session went down before first echo packet received")
1362                     now = time.time()
1363                     self.assertGreaterEqual(
1364                         now, timeout_at,
1365                         "Session timeout at %s, but is expected at %s" %
1366                         (now, timeout_at))
1367                     self.assert_equal(p[BFD].diag,
1368                                       BFDDiagCode.echo_function_failed,
1369                                       BFDDiagCode)
1370                     events = self.vapi.collect_events()
1371                     self.assert_equal(len(events), 1, "number of bfd events")
1372                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1373                     timeout_ok = True
1374                     break
1375             else:
1376                 raise Exception(ppp("Received unknown packet:", p))
1377             self.test_session.send_packet()
1378         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1379
1380     def test_invalid_echo_checksum(self):
1381         """ echo packets with invalid checksum don't keep a session up """
1382         bfd_session_up(self)
1383         self.test_session.update(required_min_echo_rx=150000)
1384         self.vapi.bfd_udp_set_echo_source(
1385             sw_if_index=self.loopback0.sw_if_index)
1386         self.test_session.send_packet()
1387         # should be turned on - loopback echo packets
1388         timeout_at = None
1389         timeout_ok = False
1390         for dummy in range(10 * self.vpp_session.detect_mult):
1391             p = self.pg0.wait_for_packet(1)
1392             if p[UDP].dport == BFD.udp_dport_echo:
1393                 self.logger.debug(ppp("Got echo packet:", p))
1394                 if timeout_at is None:
1395                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1396                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1397                 p[BFD_vpp_echo].checksum = getrandbits(64)
1398                 p[Ether].dst = self.pg0.local_mac
1399                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1400                 self.pg0.add_stream(p)
1401                 self.pg_start()
1402             elif p.haslayer(BFD):
1403                 self.logger.debug(ppp("Got packet:", p))
1404                 if "P" in p.sprintf("%BFD.flags%"):
1405                     final = self.test_session.create_packet()
1406                     final[BFD].flags = "F"
1407                     self.test_session.send_packet(final)
1408                 if p[BFD].state == BFDState.down:
1409                     self.assertIsNotNone(
1410                         timeout_at,
1411                         "Session went down before first echo packet received")
1412                     now = time.time()
1413                     self.assertGreaterEqual(
1414                         now, timeout_at,
1415                         "Session timeout at %s, but is expected at %s" %
1416                         (now, timeout_at))
1417                     self.assert_equal(p[BFD].diag,
1418                                       BFDDiagCode.echo_function_failed,
1419                                       BFDDiagCode)
1420                     events = self.vapi.collect_events()
1421                     self.assert_equal(len(events), 1, "number of bfd events")
1422                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1423                     timeout_ok = True
1424                     break
1425             else:
1426                 raise Exception(ppp("Received unknown packet:", p))
1427             self.test_session.send_packet()
1428         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1429
1430     def test_admin_up_down(self):
1431         """ put session admin-up and admin-down """
1432         bfd_session_up(self)
1433         self.vpp_session.admin_down()
1434         self.pg0.enable_capture()
1435         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1436         verify_event(self, e, expected_state=BFDState.admin_down)
1437         for dummy in range(2):
1438             p = wait_for_bfd_packet(self)
1439             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1440         # try to bring session up - shouldn't be possible
1441         self.test_session.update(state=BFDState.init)
1442         self.test_session.send_packet()
1443         for dummy in range(2):
1444             p = wait_for_bfd_packet(self)
1445             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1446         self.vpp_session.admin_up()
1447         self.test_session.update(state=BFDState.down)
1448         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1449         verify_event(self, e, expected_state=BFDState.down)
1450         p = wait_for_bfd_packet(
1451             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1452         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1453         self.test_session.send_packet()
1454         p = wait_for_bfd_packet(
1455             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1456         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1457         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1458         verify_event(self, e, expected_state=BFDState.init)
1459         self.test_session.update(state=BFDState.up)
1460         self.test_session.send_packet()
1461         p = wait_for_bfd_packet(
1462             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1463         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1464         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1465         verify_event(self, e, expected_state=BFDState.up)
1466
1467     def test_config_change_remote_demand(self):
1468         """ configuration change while peer in demand mode """
1469         bfd_session_up(self)
1470         demand = self.test_session.create_packet()
1471         demand[BFD].flags = "D"
1472         self.test_session.send_packet(demand)
1473         self.vpp_session.modify_parameters(
1474             required_min_rx=2 * self.vpp_session.required_min_rx)
1475         p = wait_for_bfd_packet(
1476             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1477         # poll bit must be set
1478         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1479         # terminate poll sequence
1480         final = self.test_session.create_packet()
1481         final[BFD].flags = "D+F"
1482         self.test_session.send_packet(final)
1483         # vpp should be quiet now again
1484         transmit_time = 0.9 \
1485             * max(self.vpp_session.required_min_rx,
1486                   self.test_session.desired_min_tx) \
1487             / USEC_IN_SEC
1488         count = 0
1489         for dummy in range(self.test_session.detect_mult * 2):
1490             self.sleep(transmit_time)
1491             self.test_session.send_packet(demand)
1492             try:
1493                 p = wait_for_bfd_packet(self, timeout=0)
1494                 self.logger.error(ppp("Received unexpected packet:", p))
1495                 count += 1
1496             except CaptureTimeoutError:
1497                 pass
1498         events = self.vapi.collect_events()
1499         for e in events:
1500             self.logger.error("Received unexpected event: %s", e)
1501         self.assert_equal(count, 0, "number of packets received")
1502         self.assert_equal(len(events), 0, "number of events received")
1503
1504     def test_intf_deleted(self):
1505         """ interface with bfd session deleted """
1506         intf = VppLoInterface(self)
1507         intf.config_ip4()
1508         intf.admin_up()
1509         sw_if_index = intf.sw_if_index
1510         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1511         vpp_session.add_vpp_config()
1512         vpp_session.admin_up()
1513         intf.remove_vpp_config()
1514         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1515         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1516         self.assertFalse(vpp_session.query_vpp_config())
1517
1518
1519 @tag_run_solo
1520 @tag_fixme_vpp_workers
1521 class BFD6TestCase(VppTestCase):
1522     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1523
1524     pg0 = None
1525     vpp_clock_offset = None
1526     vpp_session = None
1527     test_session = None
1528
1529     @classmethod
1530     def setUpClass(cls):
1531         super(BFD6TestCase, cls).setUpClass()
1532         cls.vapi.cli("set log class bfd level debug")
1533         try:
1534             cls.create_pg_interfaces([0])
1535             cls.pg0.config_ip6()
1536             cls.pg0.configure_ipv6_neighbors()
1537             cls.pg0.admin_up()
1538             cls.pg0.resolve_ndp()
1539             cls.create_loopback_interfaces(1)
1540             cls.loopback0 = cls.lo_interfaces[0]
1541             cls.loopback0.config_ip6()
1542             cls.loopback0.admin_up()
1543
1544         except Exception:
1545             super(BFD6TestCase, cls).tearDownClass()
1546             raise
1547
1548     @classmethod
1549     def tearDownClass(cls):
1550         super(BFD6TestCase, cls).tearDownClass()
1551
1552     def setUp(self):
1553         super(BFD6TestCase, self).setUp()
1554         self.factory = AuthKeyFactory()
1555         self.vapi.want_bfd_events()
1556         self.pg0.enable_capture()
1557         try:
1558             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1559                                                 self.pg0.remote_ip6,
1560                                                 af=AF_INET6)
1561             self.vpp_session.add_vpp_config()
1562             self.vpp_session.admin_up()
1563             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1564             self.logger.debug(self.vapi.cli("show adj nbr"))
1565         except BaseException:
1566             self.vapi.want_bfd_events(enable_disable=0)
1567             raise
1568
1569     def tearDown(self):
1570         if not self.vpp_dead:
1571             self.vapi.want_bfd_events(enable_disable=0)
1572         self.vapi.collect_events()  # clear the event queue
1573         super(BFD6TestCase, self).tearDown()
1574
1575     def test_session_up(self):
1576         """ bring BFD session up """
1577         bfd_session_up(self)
1578
1579     def test_session_up_by_ip(self):
1580         """ bring BFD session up - first frame looked up by address pair """
1581         self.logger.info("BFD: Sending Slow control frame")
1582         self.test_session.update(my_discriminator=randint(0, 40000000))
1583         self.test_session.send_packet()
1584         self.pg0.enable_capture()
1585         p = self.pg0.wait_for_packet(1)
1586         self.assert_equal(p[BFD].your_discriminator,
1587                           self.test_session.my_discriminator,
1588                           "BFD - your discriminator")
1589         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1590         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1591                                  state=BFDState.up)
1592         self.logger.info("BFD: Waiting for event")
1593         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1594         verify_event(self, e, expected_state=BFDState.init)
1595         self.logger.info("BFD: Sending Up")
1596         self.test_session.send_packet()
1597         self.logger.info("BFD: Waiting for event")
1598         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1599         verify_event(self, e, expected_state=BFDState.up)
1600         self.logger.info("BFD: Session is Up")
1601         self.test_session.update(state=BFDState.up)
1602         self.test_session.send_packet()
1603         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1604
1605     def test_hold_up(self):
1606         """ hold BFD session up """
1607         bfd_session_up(self)
1608         for dummy in range(self.test_session.detect_mult * 2):
1609             wait_for_bfd_packet(self)
1610             self.test_session.send_packet()
1611         self.assert_equal(len(self.vapi.collect_events()), 0,
1612                           "number of bfd events")
1613         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1614
1615     def test_echo_looped_back(self):
1616         """ echo packets looped back """
1617         # don't need a session in this case..
1618         self.vpp_session.remove_vpp_config()
1619         self.pg0.enable_capture()
1620         echo_packet_count = 10
1621         # random source port low enough to increment a few times..
1622         udp_sport_tx = randint(1, 50000)
1623         udp_sport_rx = udp_sport_tx
1624         echo_packet = (Ether(src=self.pg0.remote_mac,
1625                              dst=self.pg0.local_mac) /
1626                        IPv6(src=self.pg0.remote_ip6,
1627                             dst=self.pg0.remote_ip6) /
1628                        UDP(dport=BFD.udp_dport_echo) /
1629                        Raw("this should be looped back"))
1630         for dummy in range(echo_packet_count):
1631             self.sleep(.01, "delay between echo packets")
1632             echo_packet[UDP].sport = udp_sport_tx
1633             udp_sport_tx += 1
1634             self.logger.debug(ppp("Sending packet:", echo_packet))
1635             self.pg0.add_stream(echo_packet)
1636             self.pg_start()
1637         for dummy in range(echo_packet_count):
1638             p = self.pg0.wait_for_packet(1)
1639             self.logger.debug(ppp("Got packet:", p))
1640             ether = p[Ether]
1641             self.assert_equal(self.pg0.remote_mac,
1642                               ether.dst, "Destination MAC")
1643             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1644             ip = p[IPv6]
1645             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1646             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1647             udp = p[UDP]
1648             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1649                               "UDP destination port")
1650             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1651             udp_sport_rx += 1
1652             # need to compare the hex payload here, otherwise BFD_vpp_echo
1653             # gets in way
1654             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1655                              scapy.compat.raw(echo_packet[UDP].payload),
1656                              "Received packet is not the echo packet sent")
1657         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1658                           "ECHO packet identifier for test purposes)")
1659         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1660                           "ECHO packet identifier for test purposes)")
1661
1662     def test_echo(self):
1663         """ echo function """
1664         bfd_session_up(self)
1665         self.test_session.update(required_min_echo_rx=150000)
1666         self.test_session.send_packet()
1667         detection_time = self.test_session.detect_mult *\
1668             self.vpp_session.required_min_rx / USEC_IN_SEC
1669         # echo shouldn't work without echo source set
1670         for dummy in range(10):
1671             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1672             self.sleep(sleep, "delay before sending bfd packet")
1673             self.test_session.send_packet()
1674         p = wait_for_bfd_packet(
1675             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1676         self.assert_equal(p[BFD].required_min_rx_interval,
1677                           self.vpp_session.required_min_rx,
1678                           "BFD required min rx interval")
1679         self.test_session.send_packet()
1680         self.vapi.bfd_udp_set_echo_source(
1681             sw_if_index=self.loopback0.sw_if_index)
1682         echo_seen = False
1683         # should be turned on - loopback echo packets
1684         for dummy in range(3):
1685             loop_until = time.time() + 0.75 * detection_time
1686             while time.time() < loop_until:
1687                 p = self.pg0.wait_for_packet(1)
1688                 self.logger.debug(ppp("Got packet:", p))
1689                 if p[UDP].dport == BFD.udp_dport_echo:
1690                     self.assert_equal(
1691                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1692                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1693                                         "BFD ECHO src IP equal to loopback IP")
1694                     self.logger.debug(ppp("Looping back packet:", p))
1695                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1696                                       "ECHO packet destination MAC address")
1697                     p[Ether].dst = self.pg0.local_mac
1698                     self.pg0.add_stream(p)
1699                     self.pg_start()
1700                     echo_seen = True
1701                 elif p.haslayer(BFD):
1702                     if echo_seen:
1703                         self.assertGreaterEqual(
1704                             p[BFD].required_min_rx_interval,
1705                             1000000)
1706                     if "P" in p.sprintf("%BFD.flags%"):
1707                         final = self.test_session.create_packet()
1708                         final[BFD].flags = "F"
1709                         self.test_session.send_packet(final)
1710                 else:
1711                     raise Exception(ppp("Received unknown packet:", p))
1712
1713                 self.assert_equal(len(self.vapi.collect_events()), 0,
1714                                   "number of bfd events")
1715             self.test_session.send_packet()
1716         self.assertTrue(echo_seen, "No echo packets received")
1717
1718     def test_intf_deleted(self):
1719         """ interface with bfd session deleted """
1720         intf = VppLoInterface(self)
1721         intf.config_ip6()
1722         intf.admin_up()
1723         sw_if_index = intf.sw_if_index
1724         vpp_session = VppBFDUDPSession(
1725             self, intf, intf.remote_ip6, af=AF_INET6)
1726         vpp_session.add_vpp_config()
1727         vpp_session.admin_up()
1728         intf.remove_vpp_config()
1729         e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
1730         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1731         self.assertFalse(vpp_session.query_vpp_config())
1732
1733
1734 @tag_run_solo
1735 class BFDFIBTestCase(VppTestCase):
1736     """ BFD-FIB interactions (IPv6) """
1737
1738     vpp_session = None
1739     test_session = None
1740
1741     @classmethod
1742     def setUpClass(cls):
1743         super(BFDFIBTestCase, cls).setUpClass()
1744
1745     @classmethod
1746     def tearDownClass(cls):
1747         super(BFDFIBTestCase, cls).tearDownClass()
1748
1749     def setUp(self):
1750         super(BFDFIBTestCase, self).setUp()
1751         self.create_pg_interfaces(range(1))
1752
1753         self.vapi.want_bfd_events()
1754         self.pg0.enable_capture()
1755
1756         for i in self.pg_interfaces:
1757             i.admin_up()
1758             i.config_ip6()
1759             i.configure_ipv6_neighbors()
1760
1761     def tearDown(self):
1762         if not self.vpp_dead:
1763             self.vapi.want_bfd_events(enable_disable=False)
1764
1765         super(BFDFIBTestCase, self).tearDown()
1766
1767     @staticmethod
1768     def pkt_is_not_data_traffic(p):
1769         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1770         if p.haslayer(BFD) or is_ipv6_misc(p):
1771             return True
1772         return False
1773
1774     def test_session_with_fib(self):
1775         """ BFD-FIB interactions """
1776
1777         # packets to match against both of the routes
1778         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1779               IPv6(src="3001::1", dst="2001::1") /
1780               UDP(sport=1234, dport=1234) /
1781               Raw(b'\xa5' * 100)),
1782              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1783               IPv6(src="3001::1", dst="2002::1") /
1784               UDP(sport=1234, dport=1234) /
1785               Raw(b'\xa5' * 100))]
1786
1787         # A recursive and a non-recursive route via a next-hop that
1788         # will have a BFD session
1789         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1790                                   [VppRoutePath(self.pg0.remote_ip6,
1791                                                 self.pg0.sw_if_index)])
1792         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1793                                   [VppRoutePath(self.pg0.remote_ip6,
1794                                                 0xffffffff)])
1795         ip_2001_s_64.add_vpp_config()
1796         ip_2002_s_64.add_vpp_config()
1797
1798         # bring the session up now the routes are present
1799         self.vpp_session = VppBFDUDPSession(self,
1800                                             self.pg0,
1801                                             self.pg0.remote_ip6,
1802                                             af=AF_INET6)
1803         self.vpp_session.add_vpp_config()
1804         self.vpp_session.admin_up()
1805         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1806
1807         # session is up - traffic passes
1808         bfd_session_up(self)
1809
1810         self.pg0.add_stream(p)
1811         self.pg_start()
1812         for packet in p:
1813             captured = self.pg0.wait_for_packet(
1814                 1,
1815                 filter_out_fn=self.pkt_is_not_data_traffic)
1816             self.assertEqual(captured[IPv6].dst,
1817                              packet[IPv6].dst)
1818
1819         # session is up - traffic is dropped
1820         bfd_session_down(self)
1821
1822         self.pg0.add_stream(p)
1823         self.pg_start()
1824         with self.assertRaises(CaptureTimeoutError):
1825             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1826
1827         # session is up - traffic passes
1828         bfd_session_up(self)
1829
1830         self.pg0.add_stream(p)
1831         self.pg_start()
1832         for packet in p:
1833             captured = self.pg0.wait_for_packet(
1834                 1,
1835                 filter_out_fn=self.pkt_is_not_data_traffic)
1836             self.assertEqual(captured[IPv6].dst,
1837                              packet[IPv6].dst)
1838
1839
1840 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1841 class BFDTunTestCase(VppTestCase):
1842     """ BFD over GRE tunnel """
1843
1844     vpp_session = None
1845     test_session = None
1846
1847     @classmethod
1848     def setUpClass(cls):
1849         super(BFDTunTestCase, cls).setUpClass()
1850
1851     @classmethod
1852     def tearDownClass(cls):
1853         super(BFDTunTestCase, cls).tearDownClass()
1854
1855     def setUp(self):
1856         super(BFDTunTestCase, self).setUp()
1857         self.create_pg_interfaces(range(1))
1858
1859         self.vapi.want_bfd_events()
1860         self.pg0.enable_capture()
1861
1862         for i in self.pg_interfaces:
1863             i.admin_up()
1864             i.config_ip4()
1865             i.resolve_arp()
1866
1867     def tearDown(self):
1868         if not self.vpp_dead:
1869             self.vapi.want_bfd_events(enable_disable=0)
1870
1871         super(BFDTunTestCase, self).tearDown()
1872
1873     @staticmethod
1874     def pkt_is_not_data_traffic(p):
1875         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1876         if p.haslayer(BFD) or is_ipv6_misc(p):
1877             return True
1878         return False
1879
1880     def test_bfd_o_gre(self):
1881         """ BFD-o-GRE  """
1882
1883         # A GRE interface over which to run a BFD session
1884         gre_if = VppGreInterface(self,
1885                                  self.pg0.local_ip4,
1886                                  self.pg0.remote_ip4)
1887         gre_if.add_vpp_config()
1888         gre_if.admin_up()
1889         gre_if.config_ip4()
1890
1891         # bring the session up now the routes are present
1892         self.vpp_session = VppBFDUDPSession(self,
1893                                             gre_if,
1894                                             gre_if.remote_ip4,
1895                                             is_tunnel=True)
1896         self.vpp_session.add_vpp_config()
1897         self.vpp_session.admin_up()
1898
1899         self.test_session = BFDTestSession(
1900             self, gre_if, AF_INET,
1901             tunnel_header=(IP(src=self.pg0.remote_ip4,
1902                               dst=self.pg0.local_ip4) /
1903                            GRE()),
1904             phy_interface=self.pg0)
1905
1906         # packets to match against both of the routes
1907         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1908               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1909               UDP(sport=1234, dport=1234) /
1910               Raw(b'\xa5' * 100))]
1911
1912         # session is up - traffic passes
1913         bfd_session_up(self)
1914
1915         self.send_and_expect(self.pg0, p, self.pg0)
1916
1917         # bring session down
1918         bfd_session_down(self)
1919
1920
1921 @tag_run_solo
1922 class BFDSHA1TestCase(VppTestCase):
1923     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1924
1925     pg0 = None
1926     vpp_clock_offset = None
1927     vpp_session = None
1928     test_session = None
1929
1930     @classmethod
1931     def setUpClass(cls):
1932         super(BFDSHA1TestCase, cls).setUpClass()
1933         cls.vapi.cli("set log class bfd level debug")
1934         try:
1935             cls.create_pg_interfaces([0])
1936             cls.pg0.config_ip4()
1937             cls.pg0.admin_up()
1938             cls.pg0.resolve_arp()
1939
1940         except Exception:
1941             super(BFDSHA1TestCase, cls).tearDownClass()
1942             raise
1943
1944     @classmethod
1945     def tearDownClass(cls):
1946         super(BFDSHA1TestCase, cls).tearDownClass()
1947
1948     def setUp(self):
1949         super(BFDSHA1TestCase, self).setUp()
1950         self.factory = AuthKeyFactory()
1951         self.vapi.want_bfd_events()
1952         self.pg0.enable_capture()
1953
1954     def tearDown(self):
1955         if not self.vpp_dead:
1956             self.vapi.want_bfd_events(enable_disable=False)
1957         self.vapi.collect_events()  # clear the event queue
1958         super(BFDSHA1TestCase, self).tearDown()
1959
1960     def test_session_up(self):
1961         """ bring BFD session up """
1962         key = self.factory.create_random_key(self)
1963         key.add_vpp_config()
1964         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1965                                             self.pg0.remote_ip4,
1966                                             sha1_key=key)
1967         self.vpp_session.add_vpp_config()
1968         self.vpp_session.admin_up()
1969         self.test_session = BFDTestSession(
1970             self, self.pg0, AF_INET, sha1_key=key,
1971             bfd_key_id=self.vpp_session.bfd_key_id)
1972         bfd_session_up(self)
1973
1974     def test_hold_up(self):
1975         """ hold BFD session up """
1976         key = self.factory.create_random_key(self)
1977         key.add_vpp_config()
1978         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1979                                             self.pg0.remote_ip4,
1980                                             sha1_key=key)
1981         self.vpp_session.add_vpp_config()
1982         self.vpp_session.admin_up()
1983         self.test_session = BFDTestSession(
1984             self, self.pg0, AF_INET, sha1_key=key,
1985             bfd_key_id=self.vpp_session.bfd_key_id)
1986         bfd_session_up(self)
1987         for dummy in range(self.test_session.detect_mult * 2):
1988             wait_for_bfd_packet(self)
1989             self.test_session.send_packet()
1990         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1991
1992     def test_hold_up_meticulous(self):
1993         """ hold BFD session up - meticulous auth """
1994         key = self.factory.create_random_key(
1995             self, BFDAuthType.meticulous_keyed_sha1)
1996         key.add_vpp_config()
1997         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1998                                             self.pg0.remote_ip4, sha1_key=key)
1999         self.vpp_session.add_vpp_config()
2000         self.vpp_session.admin_up()
2001         # specify sequence number so that it wraps
2002         self.test_session = BFDTestSession(
2003             self, self.pg0, AF_INET, sha1_key=key,
2004             bfd_key_id=self.vpp_session.bfd_key_id,
2005             our_seq_number=0xFFFFFFFF - 4)
2006         bfd_session_up(self)
2007         for dummy in range(30):
2008             wait_for_bfd_packet(self)
2009             self.test_session.inc_seq_num()
2010             self.test_session.send_packet()
2011         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2012
2013     def test_send_bad_seq_number(self):
2014         """ session is not kept alive by msgs with bad sequence numbers"""
2015         key = self.factory.create_random_key(
2016             self, BFDAuthType.meticulous_keyed_sha1)
2017         key.add_vpp_config()
2018         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2019                                             self.pg0.remote_ip4, sha1_key=key)
2020         self.vpp_session.add_vpp_config()
2021         self.test_session = BFDTestSession(
2022             self, self.pg0, AF_INET, sha1_key=key,
2023             bfd_key_id=self.vpp_session.bfd_key_id)
2024         bfd_session_up(self)
2025         detection_time = self.test_session.detect_mult *\
2026             self.vpp_session.required_min_rx / USEC_IN_SEC
2027         send_until = time.time() + 2 * detection_time
2028         while time.time() < send_until:
2029             self.test_session.send_packet()
2030             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2031                        "time between bfd packets")
2032         e = self.vapi.collect_events()
2033         # session should be down now, because the sequence numbers weren't
2034         # updated
2035         self.assert_equal(len(e), 1, "number of bfd events")
2036         verify_event(self, e[0], expected_state=BFDState.down)
2037
2038     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2039                                        legitimate_test_session,
2040                                        rogue_test_session,
2041                                        rogue_bfd_values=None):
2042         """ execute a rogue session interaction scenario
2043
2044         1. create vpp session, add config
2045         2. bring the legitimate session up
2046         3. copy the bfd values from legitimate session to rogue session
2047         4. apply rogue_bfd_values to rogue session
2048         5. set rogue session state to down
2049         6. send message to take the session down from the rogue session
2050         7. assert that the legitimate session is unaffected
2051         """
2052
2053         self.vpp_session = vpp_bfd_udp_session
2054         self.vpp_session.add_vpp_config()
2055         self.test_session = legitimate_test_session
2056         # bring vpp session up
2057         bfd_session_up(self)
2058         # send packet from rogue session
2059         rogue_test_session.update(
2060             my_discriminator=self.test_session.my_discriminator,
2061             your_discriminator=self.test_session.your_discriminator,
2062             desired_min_tx=self.test_session.desired_min_tx,
2063             required_min_rx=self.test_session.required_min_rx,
2064             detect_mult=self.test_session.detect_mult,
2065             diag=self.test_session.diag,
2066             state=self.test_session.state,
2067             auth_type=self.test_session.auth_type)
2068         if rogue_bfd_values:
2069             rogue_test_session.update(**rogue_bfd_values)
2070         rogue_test_session.update(state=BFDState.down)
2071         rogue_test_session.send_packet()
2072         wait_for_bfd_packet(self)
2073         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2074
2075     def test_mismatch_auth(self):
2076         """ session is not brought down by unauthenticated msg """
2077         key = self.factory.create_random_key(self)
2078         key.add_vpp_config()
2079         vpp_session = VppBFDUDPSession(
2080             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2081         legitimate_test_session = BFDTestSession(
2082             self, self.pg0, AF_INET, sha1_key=key,
2083             bfd_key_id=vpp_session.bfd_key_id)
2084         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2085         self.execute_rogue_session_scenario(vpp_session,
2086                                             legitimate_test_session,
2087                                             rogue_test_session)
2088
2089     def test_mismatch_bfd_key_id(self):
2090         """ session is not brought down by msg with non-existent key-id """
2091         key = self.factory.create_random_key(self)
2092         key.add_vpp_config()
2093         vpp_session = VppBFDUDPSession(
2094             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2095         # pick a different random bfd key id
2096         x = randint(0, 255)
2097         while x == vpp_session.bfd_key_id:
2098             x = randint(0, 255)
2099         legitimate_test_session = BFDTestSession(
2100             self, self.pg0, AF_INET, sha1_key=key,
2101             bfd_key_id=vpp_session.bfd_key_id)
2102         rogue_test_session = BFDTestSession(
2103             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2104         self.execute_rogue_session_scenario(vpp_session,
2105                                             legitimate_test_session,
2106                                             rogue_test_session)
2107
2108     def test_mismatched_auth_type(self):
2109         """ session is not brought down by msg with wrong auth type """
2110         key = self.factory.create_random_key(self)
2111         key.add_vpp_config()
2112         vpp_session = VppBFDUDPSession(
2113             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2114         legitimate_test_session = BFDTestSession(
2115             self, self.pg0, AF_INET, sha1_key=key,
2116             bfd_key_id=vpp_session.bfd_key_id)
2117         rogue_test_session = BFDTestSession(
2118             self, self.pg0, AF_INET, sha1_key=key,
2119             bfd_key_id=vpp_session.bfd_key_id)
2120         self.execute_rogue_session_scenario(
2121             vpp_session, legitimate_test_session, rogue_test_session,
2122             {'auth_type': BFDAuthType.keyed_md5})
2123
2124     def test_restart(self):
2125         """ simulate remote peer restart and resynchronization """
2126         key = self.factory.create_random_key(
2127             self, BFDAuthType.meticulous_keyed_sha1)
2128         key.add_vpp_config()
2129         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2130                                             self.pg0.remote_ip4, sha1_key=key)
2131         self.vpp_session.add_vpp_config()
2132         self.test_session = BFDTestSession(
2133             self, self.pg0, AF_INET, sha1_key=key,
2134             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2135         bfd_session_up(self)
2136         # don't send any packets for 2*detection_time
2137         detection_time = self.test_session.detect_mult *\
2138             self.vpp_session.required_min_rx / USEC_IN_SEC
2139         self.sleep(2 * detection_time, "simulating peer restart")
2140         events = self.vapi.collect_events()
2141         self.assert_equal(len(events), 1, "number of bfd events")
2142         verify_event(self, events[0], expected_state=BFDState.down)
2143         self.test_session.update(state=BFDState.down)
2144         # reset sequence number
2145         self.test_session.our_seq_number = 0
2146         self.test_session.vpp_seq_number = None
2147         # now throw away any pending packets
2148         self.pg0.enable_capture()
2149         self.test_session.my_discriminator = 0
2150         bfd_session_up(self)
2151
2152
2153 @tag_run_solo
2154 class BFDAuthOnOffTestCase(VppTestCase):
2155     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2156
2157     pg0 = None
2158     vpp_session = None
2159     test_session = None
2160
2161     @classmethod
2162     def setUpClass(cls):
2163         super(BFDAuthOnOffTestCase, cls).setUpClass()
2164         cls.vapi.cli("set log class bfd level debug")
2165         try:
2166             cls.create_pg_interfaces([0])
2167             cls.pg0.config_ip4()
2168             cls.pg0.admin_up()
2169             cls.pg0.resolve_arp()
2170
2171         except Exception:
2172             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2173             raise
2174
2175     @classmethod
2176     def tearDownClass(cls):
2177         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2178
2179     def setUp(self):
2180         super(BFDAuthOnOffTestCase, self).setUp()
2181         self.factory = AuthKeyFactory()
2182         self.vapi.want_bfd_events()
2183         self.pg0.enable_capture()
2184
2185     def tearDown(self):
2186         if not self.vpp_dead:
2187             self.vapi.want_bfd_events(enable_disable=False)
2188         self.vapi.collect_events()  # clear the event queue
2189         super(BFDAuthOnOffTestCase, self).tearDown()
2190
2191     def test_auth_on_immediate(self):
2192         """ turn auth on without disturbing session state (immediate) """
2193         key = self.factory.create_random_key(self)
2194         key.add_vpp_config()
2195         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2196                                             self.pg0.remote_ip4)
2197         self.vpp_session.add_vpp_config()
2198         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2199         bfd_session_up(self)
2200         for dummy in range(self.test_session.detect_mult * 2):
2201             p = wait_for_bfd_packet(self)
2202             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2203             self.test_session.send_packet()
2204         self.vpp_session.activate_auth(key)
2205         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2206         self.test_session.sha1_key = key
2207         for dummy in range(self.test_session.detect_mult * 2):
2208             p = wait_for_bfd_packet(self)
2209             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2210             self.test_session.send_packet()
2211         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2212         self.assert_equal(len(self.vapi.collect_events()), 0,
2213                           "number of bfd events")
2214
2215     def test_auth_off_immediate(self):
2216         """ turn auth off without disturbing session state (immediate) """
2217         key = self.factory.create_random_key(self)
2218         key.add_vpp_config()
2219         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2220                                             self.pg0.remote_ip4, sha1_key=key)
2221         self.vpp_session.add_vpp_config()
2222         self.test_session = BFDTestSession(
2223             self, self.pg0, AF_INET, sha1_key=key,
2224             bfd_key_id=self.vpp_session.bfd_key_id)
2225         bfd_session_up(self)
2226         # self.vapi.want_bfd_events(enable_disable=0)
2227         for dummy in range(self.test_session.detect_mult * 2):
2228             p = wait_for_bfd_packet(self)
2229             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2230             self.test_session.inc_seq_num()
2231             self.test_session.send_packet()
2232         self.vpp_session.deactivate_auth()
2233         self.test_session.bfd_key_id = None
2234         self.test_session.sha1_key = None
2235         for dummy in range(self.test_session.detect_mult * 2):
2236             p = wait_for_bfd_packet(self)
2237             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2238             self.test_session.inc_seq_num()
2239             self.test_session.send_packet()
2240         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2241         self.assert_equal(len(self.vapi.collect_events()), 0,
2242                           "number of bfd events")
2243
2244     def test_auth_change_key_immediate(self):
2245         """ change auth key without disturbing session state (immediate) """
2246         key1 = self.factory.create_random_key(self)
2247         key1.add_vpp_config()
2248         key2 = self.factory.create_random_key(self)
2249         key2.add_vpp_config()
2250         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2251                                             self.pg0.remote_ip4, sha1_key=key1)
2252         self.vpp_session.add_vpp_config()
2253         self.test_session = BFDTestSession(
2254             self, self.pg0, AF_INET, sha1_key=key1,
2255             bfd_key_id=self.vpp_session.bfd_key_id)
2256         bfd_session_up(self)
2257         for dummy in range(self.test_session.detect_mult * 2):
2258             p = wait_for_bfd_packet(self)
2259             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2260             self.test_session.send_packet()
2261         self.vpp_session.activate_auth(key2)
2262         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2263         self.test_session.sha1_key = key2
2264         for dummy in range(self.test_session.detect_mult * 2):
2265             p = wait_for_bfd_packet(self)
2266             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2267             self.test_session.send_packet()
2268         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2269         self.assert_equal(len(self.vapi.collect_events()), 0,
2270                           "number of bfd events")
2271
2272     def test_auth_on_delayed(self):
2273         """ turn auth on without disturbing session state (delayed) """
2274         key = self.factory.create_random_key(self)
2275         key.add_vpp_config()
2276         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2277                                             self.pg0.remote_ip4)
2278         self.vpp_session.add_vpp_config()
2279         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2280         bfd_session_up(self)
2281         for dummy in range(self.test_session.detect_mult * 2):
2282             wait_for_bfd_packet(self)
2283             self.test_session.send_packet()
2284         self.vpp_session.activate_auth(key, delayed=True)
2285         for dummy in range(self.test_session.detect_mult * 2):
2286             p = wait_for_bfd_packet(self)
2287             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2288             self.test_session.send_packet()
2289         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2290         self.test_session.sha1_key = key
2291         self.test_session.send_packet()
2292         for dummy in range(self.test_session.detect_mult * 2):
2293             p = wait_for_bfd_packet(self)
2294             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2295             self.test_session.send_packet()
2296         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2297         self.assert_equal(len(self.vapi.collect_events()), 0,
2298                           "number of bfd events")
2299
2300     def test_auth_off_delayed(self):
2301         """ turn auth off without disturbing session state (delayed) """
2302         key = self.factory.create_random_key(self)
2303         key.add_vpp_config()
2304         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2305                                             self.pg0.remote_ip4, sha1_key=key)
2306         self.vpp_session.add_vpp_config()
2307         self.test_session = BFDTestSession(
2308             self, self.pg0, AF_INET, sha1_key=key,
2309             bfd_key_id=self.vpp_session.bfd_key_id)
2310         bfd_session_up(self)
2311         for dummy in range(self.test_session.detect_mult * 2):
2312             p = wait_for_bfd_packet(self)
2313             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2314             self.test_session.send_packet()
2315         self.vpp_session.deactivate_auth(delayed=True)
2316         for dummy in range(self.test_session.detect_mult * 2):
2317             p = wait_for_bfd_packet(self)
2318             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2319             self.test_session.send_packet()
2320         self.test_session.bfd_key_id = None
2321         self.test_session.sha1_key = None
2322         self.test_session.send_packet()
2323         for dummy in range(self.test_session.detect_mult * 2):
2324             p = wait_for_bfd_packet(self)
2325             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2326             self.test_session.send_packet()
2327         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2328         self.assert_equal(len(self.vapi.collect_events()), 0,
2329                           "number of bfd events")
2330
2331     def test_auth_change_key_delayed(self):
2332         """ change auth key without disturbing session state (delayed) """
2333         key1 = self.factory.create_random_key(self)
2334         key1.add_vpp_config()
2335         key2 = self.factory.create_random_key(self)
2336         key2.add_vpp_config()
2337         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2338                                             self.pg0.remote_ip4, sha1_key=key1)
2339         self.vpp_session.add_vpp_config()
2340         self.vpp_session.admin_up()
2341         self.test_session = BFDTestSession(
2342             self, self.pg0, AF_INET, sha1_key=key1,
2343             bfd_key_id=self.vpp_session.bfd_key_id)
2344         bfd_session_up(self)
2345         for dummy in range(self.test_session.detect_mult * 2):
2346             p = wait_for_bfd_packet(self)
2347             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2348             self.test_session.send_packet()
2349         self.vpp_session.activate_auth(key2, delayed=True)
2350         for dummy in range(self.test_session.detect_mult * 2):
2351             p = wait_for_bfd_packet(self)
2352             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2353             self.test_session.send_packet()
2354         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2355         self.test_session.sha1_key = key2
2356         self.test_session.send_packet()
2357         for dummy in range(self.test_session.detect_mult * 2):
2358             p = wait_for_bfd_packet(self)
2359             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2360             self.test_session.send_packet()
2361         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2362         self.assert_equal(len(self.vapi.collect_events()), 0,
2363                           "number of bfd events")
2364
2365
2366 @tag_run_solo
2367 class BFDCLITestCase(VppTestCase):
2368     """Bidirectional Forwarding Detection (BFD) (CLI) """
2369     pg0 = None
2370
2371     @classmethod
2372     def setUpClass(cls):
2373         super(BFDCLITestCase, cls).setUpClass()
2374         cls.vapi.cli("set log class bfd level debug")
2375         try:
2376             cls.create_pg_interfaces((0,))
2377             cls.pg0.config_ip4()
2378             cls.pg0.config_ip6()
2379             cls.pg0.resolve_arp()
2380             cls.pg0.resolve_ndp()
2381
2382         except Exception:
2383             super(BFDCLITestCase, cls).tearDownClass()
2384             raise
2385
2386     @classmethod
2387     def tearDownClass(cls):
2388         super(BFDCLITestCase, cls).tearDownClass()
2389
2390     def setUp(self):
2391         super(BFDCLITestCase, self).setUp()
2392         self.factory = AuthKeyFactory()
2393         self.pg0.enable_capture()
2394
2395     def tearDown(self):
2396         try:
2397             self.vapi.want_bfd_events(enable_disable=False)
2398         except UnexpectedApiReturnValueError:
2399             # some tests aren't subscribed, so this is not an issue
2400             pass
2401         self.vapi.collect_events()  # clear the event queue
2402         super(BFDCLITestCase, self).tearDown()
2403
2404     def cli_verify_no_response(self, cli):
2405         """ execute a CLI, asserting that the response is empty """
2406         self.assert_equal(self.vapi.cli(cli),
2407                           "",
2408                           "CLI command response")
2409
2410     def cli_verify_response(self, cli, expected):
2411         """ execute a CLI, asserting that the response matches expectation """
2412         try:
2413             reply = self.vapi.cli(cli)
2414         except CliFailedCommandError as cli_error:
2415             reply = str(cli_error)
2416         self.assert_equal(reply.strip(),
2417                           expected,
2418                           "CLI command response")
2419
2420     def test_show(self):
2421         """ show commands """
2422         k1 = self.factory.create_random_key(self)
2423         k1.add_vpp_config()
2424         k2 = self.factory.create_random_key(
2425             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2426         k2.add_vpp_config()
2427         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2428         s1.add_vpp_config()
2429         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2430                               sha1_key=k2)
2431         s2.add_vpp_config()
2432         self.logger.info(self.vapi.ppcli("show bfd keys"))
2433         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2434         self.logger.info(self.vapi.ppcli("show bfd"))
2435
2436     def test_set_del_sha1_key(self):
2437         """ set/delete SHA1 auth key """
2438         k = self.factory.create_random_key(self)
2439         self.registry.register(k, self.logger)
2440         self.cli_verify_no_response(
2441             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2442             (k.conf_key_id,
2443                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2444         self.assertTrue(k.query_vpp_config())
2445         self.vpp_session = VppBFDUDPSession(
2446             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2447         self.vpp_session.add_vpp_config()
2448         self.test_session = \
2449             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2450                            bfd_key_id=self.vpp_session.bfd_key_id)
2451         self.vapi.want_bfd_events()
2452         bfd_session_up(self)
2453         bfd_session_down(self)
2454         # try to replace the secret for the key - should fail because the key
2455         # is in-use
2456         k2 = self.factory.create_random_key(self)
2457         self.cli_verify_response(
2458             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2459             (k.conf_key_id,
2460                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2461             "bfd key set: `bfd_auth_set_key' API call failed, "
2462             "rv=-103:BFD object in use")
2463         # manipulating the session using old secret should still work
2464         bfd_session_up(self)
2465         bfd_session_down(self)
2466         self.vpp_session.remove_vpp_config()
2467         self.cli_verify_no_response(
2468             "bfd key del conf-key-id %s" % k.conf_key_id)
2469         self.assertFalse(k.query_vpp_config())
2470
2471     def test_set_del_meticulous_sha1_key(self):
2472         """ set/delete meticulous SHA1 auth key """
2473         k = self.factory.create_random_key(
2474             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2475         self.registry.register(k, self.logger)
2476         self.cli_verify_no_response(
2477             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2478             (k.conf_key_id,
2479                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2480         self.assertTrue(k.query_vpp_config())
2481         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2482                                             self.pg0.remote_ip6, af=AF_INET6,
2483                                             sha1_key=k)
2484         self.vpp_session.add_vpp_config()
2485         self.vpp_session.admin_up()
2486         self.test_session = \
2487             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2488                            bfd_key_id=self.vpp_session.bfd_key_id)
2489         self.vapi.want_bfd_events()
2490         bfd_session_up(self)
2491         bfd_session_down(self)
2492         # try to replace the secret for the key - should fail because the key
2493         # is in-use
2494         k2 = self.factory.create_random_key(self)
2495         self.cli_verify_response(
2496             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2497             (k.conf_key_id,
2498                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2499             "bfd key set: `bfd_auth_set_key' API call failed, "
2500             "rv=-103:BFD object in use")
2501         # manipulating the session using old secret should still work
2502         bfd_session_up(self)
2503         bfd_session_down(self)
2504         self.vpp_session.remove_vpp_config()
2505         self.cli_verify_no_response(
2506             "bfd key del conf-key-id %s" % k.conf_key_id)
2507         self.assertFalse(k.query_vpp_config())
2508
2509     def test_add_mod_del_bfd_udp(self):
2510         """ create/modify/delete IPv4 BFD UDP session """
2511         vpp_session = VppBFDUDPSession(
2512             self, self.pg0, self.pg0.remote_ip4)
2513         self.registry.register(vpp_session, self.logger)
2514         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2515             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2516             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2517                                 self.pg0.remote_ip4,
2518                                 vpp_session.desired_min_tx,
2519                                 vpp_session.required_min_rx,
2520                                 vpp_session.detect_mult)
2521         self.cli_verify_no_response(cli_add_cmd)
2522         # 2nd add should fail
2523         self.cli_verify_response(
2524             cli_add_cmd,
2525             "bfd udp session add: `bfd_add_add_session' API call"
2526             " failed, rv=-101:Duplicate BFD object")
2527         verify_bfd_session_config(self, vpp_session)
2528         mod_session = VppBFDUDPSession(
2529             self, self.pg0, self.pg0.remote_ip4,
2530             required_min_rx=2 * vpp_session.required_min_rx,
2531             desired_min_tx=3 * vpp_session.desired_min_tx,
2532             detect_mult=4 * vpp_session.detect_mult)
2533         self.cli_verify_no_response(
2534             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2535             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2536             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2537              mod_session.desired_min_tx, mod_session.required_min_rx,
2538              mod_session.detect_mult))
2539         verify_bfd_session_config(self, mod_session)
2540         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2541             "peer-addr %s" % (self.pg0.name,
2542                               self.pg0.local_ip4, self.pg0.remote_ip4)
2543         self.cli_verify_no_response(cli_del_cmd)
2544         # 2nd del is expected to fail
2545         self.cli_verify_response(
2546             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2547             " failed, rv=-102:No such BFD object")
2548         self.assertFalse(vpp_session.query_vpp_config())
2549
2550     def test_add_mod_del_bfd_udp6(self):
2551         """ create/modify/delete IPv6 BFD UDP session """
2552         vpp_session = VppBFDUDPSession(
2553             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2554         self.registry.register(vpp_session, self.logger)
2555         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2556             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2557             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2558                                 self.pg0.remote_ip6,
2559                                 vpp_session.desired_min_tx,
2560                                 vpp_session.required_min_rx,
2561                                 vpp_session.detect_mult)
2562         self.cli_verify_no_response(cli_add_cmd)
2563         # 2nd add should fail
2564         self.cli_verify_response(
2565             cli_add_cmd,
2566             "bfd udp session add: `bfd_add_add_session' API call"
2567             " failed, rv=-101:Duplicate BFD object")
2568         verify_bfd_session_config(self, vpp_session)
2569         mod_session = VppBFDUDPSession(
2570             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2571             required_min_rx=2 * vpp_session.required_min_rx,
2572             desired_min_tx=3 * vpp_session.desired_min_tx,
2573             detect_mult=4 * vpp_session.detect_mult)
2574         self.cli_verify_no_response(
2575             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2576             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2577             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2578              mod_session.desired_min_tx,
2579              mod_session.required_min_rx, mod_session.detect_mult))
2580         verify_bfd_session_config(self, mod_session)
2581         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2582             "peer-addr %s" % (self.pg0.name,
2583                               self.pg0.local_ip6, self.pg0.remote_ip6)
2584         self.cli_verify_no_response(cli_del_cmd)
2585         # 2nd del is expected to fail
2586         self.cli_verify_response(
2587             cli_del_cmd,
2588             "bfd udp session del: `bfd_udp_del_session' API call"
2589             " failed, rv=-102:No such BFD object")
2590         self.assertFalse(vpp_session.query_vpp_config())
2591
2592     def test_add_mod_del_bfd_udp_auth(self):
2593         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2594         key = self.factory.create_random_key(self)
2595         key.add_vpp_config()
2596         vpp_session = VppBFDUDPSession(
2597             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2598         self.registry.register(vpp_session, self.logger)
2599         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2600             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2601             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2602             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2603                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2604                vpp_session.detect_mult, key.conf_key_id,
2605                vpp_session.bfd_key_id)
2606         self.cli_verify_no_response(cli_add_cmd)
2607         # 2nd add should fail
2608         self.cli_verify_response(
2609             cli_add_cmd,
2610             "bfd udp session add: `bfd_add_add_session' API call"
2611             " failed, rv=-101:Duplicate BFD object")
2612         verify_bfd_session_config(self, vpp_session)
2613         mod_session = VppBFDUDPSession(
2614             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2615             bfd_key_id=vpp_session.bfd_key_id,
2616             required_min_rx=2 * vpp_session.required_min_rx,
2617             desired_min_tx=3 * vpp_session.desired_min_tx,
2618             detect_mult=4 * vpp_session.detect_mult)
2619         self.cli_verify_no_response(
2620             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2621             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2622             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2623              mod_session.desired_min_tx,
2624              mod_session.required_min_rx, mod_session.detect_mult))
2625         verify_bfd_session_config(self, mod_session)
2626         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2627             "peer-addr %s" % (self.pg0.name,
2628                               self.pg0.local_ip4, self.pg0.remote_ip4)
2629         self.cli_verify_no_response(cli_del_cmd)
2630         # 2nd del is expected to fail
2631         self.cli_verify_response(
2632             cli_del_cmd,
2633             "bfd udp session del: `bfd_udp_del_session' API call"
2634             " failed, rv=-102:No such BFD object")
2635         self.assertFalse(vpp_session.query_vpp_config())
2636
2637     def test_add_mod_del_bfd_udp6_auth(self):
2638         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2639         key = self.factory.create_random_key(
2640             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2641         key.add_vpp_config()
2642         vpp_session = VppBFDUDPSession(
2643             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2644         self.registry.register(vpp_session, self.logger)
2645         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2646             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2647             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2648             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2649                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2650                vpp_session.detect_mult, key.conf_key_id,
2651                vpp_session.bfd_key_id)
2652         self.cli_verify_no_response(cli_add_cmd)
2653         # 2nd add should fail
2654         self.cli_verify_response(
2655             cli_add_cmd,
2656             "bfd udp session add: `bfd_add_add_session' API call"
2657             " failed, rv=-101:Duplicate BFD object")
2658         verify_bfd_session_config(self, vpp_session)
2659         mod_session = VppBFDUDPSession(
2660             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2661             bfd_key_id=vpp_session.bfd_key_id,
2662             required_min_rx=2 * vpp_session.required_min_rx,
2663             desired_min_tx=3 * vpp_session.desired_min_tx,
2664             detect_mult=4 * vpp_session.detect_mult)
2665         self.cli_verify_no_response(
2666             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2667             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2668             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2669              mod_session.desired_min_tx,
2670              mod_session.required_min_rx, mod_session.detect_mult))
2671         verify_bfd_session_config(self, mod_session)
2672         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2673             "peer-addr %s" % (self.pg0.name,
2674                               self.pg0.local_ip6, self.pg0.remote_ip6)
2675         self.cli_verify_no_response(cli_del_cmd)
2676         # 2nd del is expected to fail
2677         self.cli_verify_response(
2678             cli_del_cmd,
2679             "bfd udp session del: `bfd_udp_del_session' API call"
2680             " failed, rv=-102:No such BFD object")
2681         self.assertFalse(vpp_session.query_vpp_config())
2682
2683     def test_auth_on_off(self):
2684         """ turn authentication on and off """
2685         key = self.factory.create_random_key(
2686             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2687         key.add_vpp_config()
2688         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2689         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2690                                         sha1_key=key)
2691         session.add_vpp_config()
2692         cli_activate = \
2693             "bfd udp session auth activate interface %s local-addr %s "\
2694             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2695             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2696                key.conf_key_id, auth_session.bfd_key_id)
2697         self.cli_verify_no_response(cli_activate)
2698         verify_bfd_session_config(self, auth_session)
2699         self.cli_verify_no_response(cli_activate)
2700         verify_bfd_session_config(self, auth_session)
2701         cli_deactivate = \
2702             "bfd udp session auth deactivate interface %s local-addr %s "\
2703             "peer-addr %s "\
2704             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2705         self.cli_verify_no_response(cli_deactivate)
2706         verify_bfd_session_config(self, session)
2707         self.cli_verify_no_response(cli_deactivate)
2708         verify_bfd_session_config(self, session)
2709
2710     def test_auth_on_off_delayed(self):
2711         """ turn authentication on and off (delayed) """
2712         key = self.factory.create_random_key(
2713             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2714         key.add_vpp_config()
2715         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2716         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2717                                         sha1_key=key)
2718         session.add_vpp_config()
2719         cli_activate = \
2720             "bfd udp session auth activate interface %s local-addr %s "\
2721             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2722             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2723                key.conf_key_id, auth_session.bfd_key_id)
2724         self.cli_verify_no_response(cli_activate)
2725         verify_bfd_session_config(self, auth_session)
2726         self.cli_verify_no_response(cli_activate)
2727         verify_bfd_session_config(self, auth_session)
2728         cli_deactivate = \
2729             "bfd udp session auth deactivate interface %s local-addr %s "\
2730             "peer-addr %s delayed yes"\
2731             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2732         self.cli_verify_no_response(cli_deactivate)
2733         verify_bfd_session_config(self, session)
2734         self.cli_verify_no_response(cli_deactivate)
2735         verify_bfd_session_config(self, session)
2736
2737     def test_admin_up_down(self):
2738         """ put session admin-up and admin-down """
2739         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2740         session.add_vpp_config()
2741         cli_down = \
2742             "bfd udp session set-flags admin down interface %s local-addr %s "\
2743             "peer-addr %s "\
2744             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2745         cli_up = \
2746             "bfd udp session set-flags admin up interface %s local-addr %s "\
2747             "peer-addr %s "\
2748             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2749         self.cli_verify_no_response(cli_down)
2750         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2751         self.cli_verify_no_response(cli_up)
2752         verify_bfd_session_config(self, session, state=BFDState.down)
2753
2754     def test_set_del_udp_echo_source(self):
2755         """ set/del udp echo source """
2756         self.create_loopback_interfaces(1)
2757         self.loopback0 = self.lo_interfaces[0]
2758         self.loopback0.admin_up()
2759         self.cli_verify_response("show bfd echo-source",
2760                                  "UDP echo source is not set.")
2761         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2762         self.cli_verify_no_response(cli_set)
2763         self.cli_verify_response("show bfd echo-source",
2764                                  "UDP echo source is: %s\n"
2765                                  "IPv4 address usable as echo source: none\n"
2766                                  "IPv6 address usable as echo source: none" %
2767                                  self.loopback0.name)
2768         self.loopback0.config_ip4()
2769         echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
2770             self.loopback0.local_ip4)) ^ 1))
2771         self.cli_verify_response("show bfd echo-source",
2772                                  "UDP echo source is: %s\n"
2773                                  "IPv4 address usable as echo source: %s\n"
2774                                  "IPv6 address usable as echo source: none" %
2775                                  (self.loopback0.name, echo_ip4))
2776         echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
2777             self.loopback0.local_ip6)) ^ 1))
2778         self.loopback0.config_ip6()
2779         self.cli_verify_response("show bfd echo-source",
2780                                  "UDP echo source is: %s\n"
2781                                  "IPv4 address usable as echo source: %s\n"
2782                                  "IPv6 address usable as echo source: %s" %
2783                                  (self.loopback0.name, echo_ip4, echo_ip6))
2784         cli_del = "bfd udp echo-source del"
2785         self.cli_verify_no_response(cli_del)
2786         self.cli_verify_response("show bfd echo-source",
2787                                  "UDP echo source is not set.")
2788
2789
2790 if __name__ == '__main__':
2791     unittest.main(testRunner=VppTestRunner)