ipsec: fix AES CBC IV generation (CVE-2022-46397)
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError
29 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
30 from vpp_gre_interface import VppGreInterface
31
32 USEC_IN_SEC = 1000000
33
34
35 class AuthKeyFactory(object):
36     """Factory class for creating auth keys with unique conf key ID"""
37
38     def __init__(self):
39         self._conf_key_ids = {}
40
41     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
42         """ create a random key with unique conf key id """
43         conf_key_id = randint(0, 0xFFFFFFFF)
44         while conf_key_id in self._conf_key_ids:
45             conf_key_id = randint(0, 0xFFFFFFFF)
46         self._conf_key_ids[conf_key_id] = 1
47         key = scapy.compat.raw(
48             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
49         return VppBFDAuthKey(test=test, auth_type=auth_type,
50                              conf_key_id=conf_key_id, key=key)
51
52
53 @unittest.skipUnless(running_extended_tests, "part of extended tests")
54 class BFDAPITestCase(VppTestCase):
55     """Bidirectional Forwarding Detection (BFD) - API"""
56
57     pg0 = None
58     pg1 = None
59
60     @classmethod
61     def setUpClass(cls):
62         super(BFDAPITestCase, cls).setUpClass()
63         cls.vapi.cli("set log class bfd level debug")
64         try:
65             cls.create_pg_interfaces(range(2))
66             for i in cls.pg_interfaces:
67                 i.config_ip4()
68                 i.config_ip6()
69                 i.resolve_arp()
70
71         except Exception:
72             super(BFDAPITestCase, cls).tearDownClass()
73             raise
74
75     @classmethod
76     def tearDownClass(cls):
77         super(BFDAPITestCase, cls).tearDownClass()
78
79     def setUp(self):
80         super(BFDAPITestCase, self).setUp()
81         self.factory = AuthKeyFactory()
82
83     def test_add_bfd(self):
84         """ create a BFD session """
85         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
86         session.add_vpp_config()
87         self.logger.debug("Session state is %s", session.state)
88         session.remove_vpp_config()
89         session.add_vpp_config()
90         self.logger.debug("Session state is %s", session.state)
91         session.remove_vpp_config()
92
93     def test_double_add(self):
94         """ create the same BFD session twice (negative case) """
95         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
96         session.add_vpp_config()
97
98         with self.vapi.assert_negative_api_retval():
99             session.add_vpp_config()
100
101         session.remove_vpp_config()
102
103     def test_add_bfd6(self):
104         """ create IPv6 BFD session """
105         session = VppBFDUDPSession(
106             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
107         session.add_vpp_config()
108         self.logger.debug("Session state is %s", session.state)
109         session.remove_vpp_config()
110         session.add_vpp_config()
111         self.logger.debug("Session state is %s", session.state)
112         session.remove_vpp_config()
113
114     def test_mod_bfd(self):
115         """ modify BFD session parameters """
116         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
117                                    desired_min_tx=50000,
118                                    required_min_rx=10000,
119                                    detect_mult=1)
120         session.add_vpp_config()
121         s = session.get_bfd_udp_session_dump_entry()
122         self.assert_equal(session.desired_min_tx,
123                           s.desired_min_tx,
124                           "desired min transmit interval")
125         self.assert_equal(session.required_min_rx,
126                           s.required_min_rx,
127                           "required min receive interval")
128         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
129         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
130                                   required_min_rx=session.required_min_rx * 2,
131                                   detect_mult=session.detect_mult * 2)
132         s = session.get_bfd_udp_session_dump_entry()
133         self.assert_equal(session.desired_min_tx,
134                           s.desired_min_tx,
135                           "desired min transmit interval")
136         self.assert_equal(session.required_min_rx,
137                           s.required_min_rx,
138                           "required min receive interval")
139         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
140
141     def test_add_sha1_keys(self):
142         """ add SHA1 keys """
143         key_count = 10
144         keys = [self.factory.create_random_key(
145             self) for i in range(0, key_count)]
146         for key in keys:
147             self.assertFalse(key.query_vpp_config())
148         for key in keys:
149             key.add_vpp_config()
150         for key in keys:
151             self.assertTrue(key.query_vpp_config())
152         # remove randomly
153         indexes = range(key_count)
154         shuffle(indexes)
155         removed = []
156         for i in indexes:
157             key = keys[i]
158             key.remove_vpp_config()
159             removed.append(i)
160             for j in range(key_count):
161                 key = keys[j]
162                 if j in removed:
163                     self.assertFalse(key.query_vpp_config())
164                 else:
165                     self.assertTrue(key.query_vpp_config())
166         # should be removed now
167         for key in keys:
168             self.assertFalse(key.query_vpp_config())
169         # add back and remove again
170         for key in keys:
171             key.add_vpp_config()
172         for key in keys:
173             self.assertTrue(key.query_vpp_config())
174         for key in keys:
175             key.remove_vpp_config()
176         for key in keys:
177             self.assertFalse(key.query_vpp_config())
178
179     def test_add_bfd_sha1(self):
180         """ create a BFD session (SHA1) """
181         key = self.factory.create_random_key(self)
182         key.add_vpp_config()
183         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184                                    sha1_key=key)
185         session.add_vpp_config()
186         self.logger.debug("Session state is %s", session.state)
187         session.remove_vpp_config()
188         session.add_vpp_config()
189         self.logger.debug("Session state is %s", session.state)
190         session.remove_vpp_config()
191
192     def test_double_add_sha1(self):
193         """ create the same BFD session twice (negative case) (SHA1) """
194         key = self.factory.create_random_key(self)
195         key.add_vpp_config()
196         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
197                                    sha1_key=key)
198         session.add_vpp_config()
199         with self.assertRaises(Exception):
200             session.add_vpp_config()
201
202     def test_add_auth_nonexistent_key(self):
203         """ create BFD session using non-existent SHA1 (negative case) """
204         session = VppBFDUDPSession(
205             self, self.pg0, self.pg0.remote_ip4,
206             sha1_key=self.factory.create_random_key(self))
207         with self.assertRaises(Exception):
208             session.add_vpp_config()
209
210     def test_shared_sha1_key(self):
211         """ share single SHA1 key between multiple BFD sessions """
212         key = self.factory.create_random_key(self)
213         key.add_vpp_config()
214         sessions = [
215             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
216                              sha1_key=key),
217             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
218                              sha1_key=key, af=AF_INET6),
219             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
220                              sha1_key=key),
221             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
222                              sha1_key=key, af=AF_INET6)]
223         for s in sessions:
224             s.add_vpp_config()
225         removed = 0
226         for s in sessions:
227             e = key.get_bfd_auth_keys_dump_entry()
228             self.assert_equal(e.use_count, len(sessions) - removed,
229                               "Use count for shared key")
230             s.remove_vpp_config()
231             removed += 1
232         e = key.get_bfd_auth_keys_dump_entry()
233         self.assert_equal(e.use_count, len(sessions) - removed,
234                           "Use count for shared key")
235
236     def test_activate_auth(self):
237         """ activate SHA1 authentication """
238         key = self.factory.create_random_key(self)
239         key.add_vpp_config()
240         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
241         session.add_vpp_config()
242         session.activate_auth(key)
243
244     def test_deactivate_auth(self):
245         """ deactivate SHA1 authentication """
246         key = self.factory.create_random_key(self)
247         key.add_vpp_config()
248         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
249         session.add_vpp_config()
250         session.activate_auth(key)
251         session.deactivate_auth()
252
253     def test_change_key(self):
254         """ change SHA1 key """
255         key1 = self.factory.create_random_key(self)
256         key2 = self.factory.create_random_key(self)
257         while key2.conf_key_id == key1.conf_key_id:
258             key2 = self.factory.create_random_key(self)
259         key1.add_vpp_config()
260         key2.add_vpp_config()
261         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
262                                    sha1_key=key1)
263         session.add_vpp_config()
264         session.activate_auth(key2)
265
266     def test_set_del_udp_echo_source(self):
267         """ set/del udp echo source """
268         self.create_loopback_interfaces(1)
269         self.loopback0 = self.lo_interfaces[0]
270         self.loopback0.admin_up()
271         echo_source = self.vapi.bfd_udp_get_echo_source()
272         self.assertFalse(echo_source.is_set)
273         self.assertFalse(echo_source.have_usable_ip4)
274         self.assertFalse(echo_source.have_usable_ip6)
275
276         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
277         echo_source = self.vapi.bfd_udp_get_echo_source()
278         self.assertTrue(echo_source.is_set)
279         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
280         self.assertFalse(echo_source.have_usable_ip4)
281         self.assertFalse(echo_source.have_usable_ip6)
282
283         self.loopback0.config_ip4()
284         unpacked = unpack("!L", self.loopback0.local_ip4n)
285         echo_ip4 = pack("!L", unpacked[0] ^ 1)
286         echo_source = self.vapi.bfd_udp_get_echo_source()
287         self.assertTrue(echo_source.is_set)
288         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
289         self.assertTrue(echo_source.have_usable_ip4)
290         self.assertEqual(echo_source.ip4_addr, echo_ip4)
291         self.assertFalse(echo_source.have_usable_ip6)
292
293         self.loopback0.config_ip6()
294         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
295         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
296                         unpacked[3] ^ 1)
297         echo_source = self.vapi.bfd_udp_get_echo_source()
298         self.assertTrue(echo_source.is_set)
299         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
300         self.assertTrue(echo_source.have_usable_ip4)
301         self.assertEqual(echo_source.ip4_addr, echo_ip4)
302         self.assertTrue(echo_source.have_usable_ip6)
303         self.assertEqual(echo_source.ip6_addr, echo_ip6)
304
305         self.vapi.bfd_udp_del_echo_source()
306         echo_source = self.vapi.bfd_udp_get_echo_source()
307         self.assertFalse(echo_source.is_set)
308         self.assertFalse(echo_source.have_usable_ip4)
309         self.assertFalse(echo_source.have_usable_ip6)
310
311
312 class BFDTestSession(object):
313     """ BFD session as seen from test framework side """
314
315     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
316                  bfd_key_id=None, our_seq_number=None,
317                  tunnel_header=None, phy_interface=None):
318         self.test = test
319         self.af = af
320         self.sha1_key = sha1_key
321         self.bfd_key_id = bfd_key_id
322         self.interface = interface
323         if phy_interface:
324             self.phy_interface = phy_interface
325         else:
326             self.phy_interface = self.interface
327         self.udp_sport = randint(49152, 65535)
328         if our_seq_number is None:
329             self.our_seq_number = randint(0, 40000000)
330         else:
331             self.our_seq_number = our_seq_number
332         self.vpp_seq_number = None
333         self.my_discriminator = 0
334         self.desired_min_tx = 300000
335         self.required_min_rx = 300000
336         self.required_min_echo_rx = None
337         self.detect_mult = detect_mult
338         self.diag = BFDDiagCode.no_diagnostic
339         self.your_discriminator = None
340         self.state = BFDState.down
341         self.auth_type = BFDAuthType.no_auth
342         self.tunnel_header = tunnel_header
343
344     def inc_seq_num(self):
345         """ increment sequence number, wrapping if needed """
346         if self.our_seq_number == 0xFFFFFFFF:
347             self.our_seq_number = 0
348         else:
349             self.our_seq_number += 1
350
351     def update(self, my_discriminator=None, your_discriminator=None,
352                desired_min_tx=None, required_min_rx=None,
353                required_min_echo_rx=None, detect_mult=None,
354                diag=None, state=None, auth_type=None):
355         """ update BFD parameters associated with session """
356         if my_discriminator is not None:
357             self.my_discriminator = my_discriminator
358         if your_discriminator is not None:
359             self.your_discriminator = your_discriminator
360         if required_min_rx is not None:
361             self.required_min_rx = required_min_rx
362         if required_min_echo_rx is not None:
363             self.required_min_echo_rx = required_min_echo_rx
364         if desired_min_tx is not None:
365             self.desired_min_tx = desired_min_tx
366         if detect_mult is not None:
367             self.detect_mult = detect_mult
368         if diag is not None:
369             self.diag = diag
370         if state is not None:
371             self.state = state
372         if auth_type is not None:
373             self.auth_type = auth_type
374
375     def fill_packet_fields(self, packet):
376         """ set packet fields with known values in packet """
377         bfd = packet[BFD]
378         if self.my_discriminator:
379             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
380                                    self.my_discriminator)
381             bfd.my_discriminator = self.my_discriminator
382         if self.your_discriminator:
383             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
384                                    self.your_discriminator)
385             bfd.your_discriminator = self.your_discriminator
386         if self.required_min_rx:
387             self.test.logger.debug(
388                 "BFD: setting packet.required_min_rx_interval=%s",
389                 self.required_min_rx)
390             bfd.required_min_rx_interval = self.required_min_rx
391         if self.required_min_echo_rx:
392             self.test.logger.debug(
393                 "BFD: setting packet.required_min_echo_rx=%s",
394                 self.required_min_echo_rx)
395             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
396         if self.desired_min_tx:
397             self.test.logger.debug(
398                 "BFD: setting packet.desired_min_tx_interval=%s",
399                 self.desired_min_tx)
400             bfd.desired_min_tx_interval = self.desired_min_tx
401         if self.detect_mult:
402             self.test.logger.debug(
403                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
404             bfd.detect_mult = self.detect_mult
405         if self.diag:
406             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
407             bfd.diag = self.diag
408         if self.state:
409             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
410             bfd.state = self.state
411         if self.auth_type:
412             # this is used by a negative test-case
413             self.test.logger.debug("BFD: setting packet.auth_type=%s",
414                                    self.auth_type)
415             bfd.auth_type = self.auth_type
416
417     def create_packet(self):
418         """ create a BFD packet, reflecting the current state of session """
419         if self.sha1_key:
420             bfd = BFD(flags="A")
421             bfd.auth_type = self.sha1_key.auth_type
422             bfd.auth_len = BFD.sha1_auth_len
423             bfd.auth_key_id = self.bfd_key_id
424             bfd.auth_seq_num = self.our_seq_number
425             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
426         else:
427             bfd = BFD()
428         packet = Ether(src=self.phy_interface.remote_mac,
429                        dst=self.phy_interface.local_mac)
430         if self.tunnel_header:
431             packet = packet / self.tunnel_header
432         if self.af == AF_INET6:
433             packet = (packet /
434                       IPv6(src=self.interface.remote_ip6,
435                            dst=self.interface.local_ip6,
436                            hlim=255) /
437                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
438                       bfd)
439         else:
440             packet = (packet /
441                       IP(src=self.interface.remote_ip4,
442                          dst=self.interface.local_ip4,
443                          ttl=255) /
444                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
445                       bfd)
446         self.test.logger.debug("BFD: Creating packet")
447         self.fill_packet_fields(packet)
448         if self.sha1_key:
449             hash_material = scapy.compat.raw(
450                 packet[BFD])[:32] + self.sha1_key.key + \
451                 "\0" * (20 - len(self.sha1_key.key))
452             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
453                                    hashlib.sha1(hash_material).hexdigest())
454             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
455         return packet
456
457     def send_packet(self, packet=None, interface=None):
458         """ send packet on interface, creating the packet if needed """
459         if packet is None:
460             packet = self.create_packet()
461         if interface is None:
462             interface = self.phy_interface
463         self.test.logger.debug(ppp("Sending packet:", packet))
464         interface.add_stream(packet)
465         self.test.pg_start()
466
467     def verify_sha1_auth(self, packet):
468         """ Verify correctness of authentication in BFD layer. """
469         bfd = packet[BFD]
470         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
471         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
472                                BFDAuthType)
473         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
474         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
475         if self.vpp_seq_number is None:
476             self.vpp_seq_number = bfd.auth_seq_num
477             self.test.logger.debug("Received initial sequence number: %s" %
478                                    self.vpp_seq_number)
479         else:
480             recvd_seq_num = bfd.auth_seq_num
481             self.test.logger.debug("Received followup sequence number: %s" %
482                                    recvd_seq_num)
483             if self.vpp_seq_number < 0xffffffff:
484                 if self.sha1_key.auth_type == \
485                         BFDAuthType.meticulous_keyed_sha1:
486                     self.test.assert_equal(recvd_seq_num,
487                                            self.vpp_seq_number + 1,
488                                            "BFD sequence number")
489                 else:
490                     self.test.assert_in_range(recvd_seq_num,
491                                               self.vpp_seq_number,
492                                               self.vpp_seq_number + 1,
493                                               "BFD sequence number")
494             else:
495                 if self.sha1_key.auth_type == \
496                         BFDAuthType.meticulous_keyed_sha1:
497                     self.test.assert_equal(recvd_seq_num, 0,
498                                            "BFD sequence number")
499                 else:
500                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
501                                        "BFD sequence number not one of "
502                                        "(%s, 0)" % self.vpp_seq_number)
503             self.vpp_seq_number = recvd_seq_num
504         # last 20 bytes represent the hash - so replace them with the key,
505         # pad the result with zeros and hash the result
506         hash_material = bfd.original[:-20] + self.sha1_key.key + \
507             b"\0" * (20 - len(self.sha1_key.key))
508         expected_hash = hashlib.sha1(hash_material).hexdigest()
509         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
510                                expected_hash, "Auth key hash")
511
512     def verify_bfd(self, packet):
513         """ Verify correctness of BFD layer. """
514         bfd = packet[BFD]
515         self.test.assert_equal(bfd.version, 1, "BFD version")
516         self.test.assert_equal(bfd.your_discriminator,
517                                self.my_discriminator,
518                                "BFD - your discriminator")
519         if self.sha1_key:
520             self.verify_sha1_auth(packet)
521
522
523 def bfd_session_up(test):
524     """ Bring BFD session up """
525     test.logger.info("BFD: Waiting for slow hello")
526     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
527     old_offset = None
528     if hasattr(test, 'vpp_clock_offset'):
529         old_offset = test.vpp_clock_offset
530     test.vpp_clock_offset = time.time() - p.time
531     test.logger.debug("BFD: Calculated vpp clock offset: %s",
532                       test.vpp_clock_offset)
533     if old_offset:
534         test.assertAlmostEqual(
535             old_offset, test.vpp_clock_offset, delta=0.5,
536             msg="vpp clock offset not stable (new: %s, old: %s)" %
537             (test.vpp_clock_offset, old_offset))
538     test.logger.info("BFD: Sending Init")
539     test.test_session.update(my_discriminator=randint(0, 40000000),
540                              your_discriminator=p[BFD].my_discriminator,
541                              state=BFDState.init)
542     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
543             BFDAuthType.meticulous_keyed_sha1:
544         test.test_session.inc_seq_num()
545     test.test_session.send_packet()
546     test.logger.info("BFD: Waiting for event")
547     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
548     verify_event(test, e, expected_state=BFDState.up)
549     test.logger.info("BFD: Session is Up")
550     test.test_session.update(state=BFDState.up)
551     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
552             BFDAuthType.meticulous_keyed_sha1:
553         test.test_session.inc_seq_num()
554     test.test_session.send_packet()
555     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
556
557
558 def bfd_session_down(test):
559     """ Bring BFD session down """
560     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561     test.test_session.update(state=BFDState.down)
562     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
563             BFDAuthType.meticulous_keyed_sha1:
564         test.test_session.inc_seq_num()
565     test.test_session.send_packet()
566     test.logger.info("BFD: Waiting for event")
567     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
568     verify_event(test, e, expected_state=BFDState.down)
569     test.logger.info("BFD: Session is Down")
570     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
571
572
573 def verify_bfd_session_config(test, session, state=None):
574     dump = session.get_bfd_udp_session_dump_entry()
575     test.assertIsNotNone(dump)
576     # since dump is not none, we have verified that sw_if_index and addresses
577     # are valid (in get_bfd_udp_session_dump_entry)
578     if state:
579         test.assert_equal(dump.state, state, "session state")
580     test.assert_equal(dump.required_min_rx, session.required_min_rx,
581                       "required min rx interval")
582     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
583                       "desired min tx interval")
584     test.assert_equal(dump.detect_mult, session.detect_mult,
585                       "detect multiplier")
586     if session.sha1_key is None:
587         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
588     else:
589         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
590         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
591                           "bfd key id")
592         test.assert_equal(dump.conf_key_id,
593                           session.sha1_key.conf_key_id,
594                           "config key id")
595
596
597 def verify_ip(test, packet):
598     """ Verify correctness of IP layer. """
599     if test.vpp_session.af == AF_INET6:
600         ip = packet[IPv6]
601         local_ip = test.vpp_session.interface.local_ip6
602         remote_ip = test.vpp_session.interface.remote_ip6
603         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
604     else:
605         ip = packet[IP]
606         local_ip = test.vpp_session.interface.local_ip4
607         remote_ip = test.vpp_session.interface.remote_ip4
608         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
609     test.assert_equal(ip.src, local_ip, "IP source address")
610     test.assert_equal(ip.dst, remote_ip, "IP destination address")
611
612
613 def verify_udp(test, packet):
614     """ Verify correctness of UDP layer. """
615     udp = packet[UDP]
616     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
617     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
618                          "UDP source port")
619
620
621 def verify_event(test, event, expected_state):
622     """ Verify correctness of event values. """
623     e = event
624     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
625     test.assert_equal(e.sw_if_index,
626                       test.vpp_session.interface.sw_if_index,
627                       "BFD interface index")
628     is_ipv6 = 0
629     if test.vpp_session.af == AF_INET6:
630         is_ipv6 = 1
631     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
632     if test.vpp_session.af == AF_INET:
633         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
634                           "Local IPv4 address")
635         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
636                           "Peer IPv4 address")
637     else:
638         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
639                           "Local IPv6 address")
640         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
641                           "Peer IPv6 address")
642     test.assert_equal(e.state, expected_state, BFDState)
643
644
645 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
646     """ wait for BFD packet and verify its correctness
647
648     :param timeout: how long to wait
649     :param pcap_time_min: ignore packets with pcap timestamp lower than this
650
651     :returns: tuple (packet, time spent waiting for packet)
652     """
653     test.logger.info("BFD: Waiting for BFD packet")
654     deadline = time.time() + timeout
655     counter = 0
656     while True:
657         counter += 1
658         # sanity check
659         test.assert_in_range(counter, 0, 100, "number of packets ignored")
660         time_left = deadline - time.time()
661         if time_left < 0:
662             raise CaptureTimeoutError("Packet did not arrive within timeout")
663         p = test.pg0.wait_for_packet(timeout=time_left)
664         test.logger.debug(ppp("BFD: Got packet:", p))
665         if pcap_time_min is not None and p.time < pcap_time_min:
666             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
667                                   "pcap time min %s):" %
668                                   (p.time, pcap_time_min), p))
669         else:
670             break
671     if is_tunnel:
672         # strip an IP layer and move to the next
673         p = p[IP].payload
674
675     bfd = p[BFD]
676     if bfd is None:
677         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
678     if bfd.payload:
679         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
680     verify_ip(test, p)
681     verify_udp(test, p)
682     test.test_session.verify_bfd(p)
683     return p
684
685
686 @unittest.skipUnless(running_extended_tests, "part of extended tests")
687 class BFD4TestCase(VppTestCase):
688     """Bidirectional Forwarding Detection (BFD)"""
689
690     pg0 = None
691     vpp_clock_offset = None
692     vpp_session = None
693     test_session = None
694
695     @classmethod
696     def setUpClass(cls):
697         super(BFD4TestCase, cls).setUpClass()
698         cls.vapi.cli("set log class bfd level debug")
699         try:
700             cls.create_pg_interfaces([0])
701             cls.create_loopback_interfaces(1)
702             cls.loopback0 = cls.lo_interfaces[0]
703             cls.loopback0.config_ip4()
704             cls.loopback0.admin_up()
705             cls.pg0.config_ip4()
706             cls.pg0.configure_ipv4_neighbors()
707             cls.pg0.admin_up()
708             cls.pg0.resolve_arp()
709
710         except Exception:
711             super(BFD4TestCase, cls).tearDownClass()
712             raise
713
714     @classmethod
715     def tearDownClass(cls):
716         super(BFD4TestCase, cls).tearDownClass()
717
718     def setUp(self):
719         super(BFD4TestCase, self).setUp()
720         self.factory = AuthKeyFactory()
721         self.vapi.want_bfd_events()
722         self.pg0.enable_capture()
723         try:
724             self.vpp_session = VppBFDUDPSession(self, self.pg0,
725                                                 self.pg0.remote_ip4)
726             self.vpp_session.add_vpp_config()
727             self.vpp_session.admin_up()
728             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
729         except:
730             self.vapi.want_bfd_events(enable_disable=0)
731             raise
732
733     def tearDown(self):
734         if not self.vpp_dead:
735             self.vapi.want_bfd_events(enable_disable=0)
736         self.vapi.collect_events()  # clear the event queue
737         super(BFD4TestCase, self).tearDown()
738
739     def test_session_up(self):
740         """ bring BFD session up """
741         bfd_session_up(self)
742
743     def test_session_up_by_ip(self):
744         """ bring BFD session up - first frame looked up by address pair """
745         self.logger.info("BFD: Sending Slow control frame")
746         self.test_session.update(my_discriminator=randint(0, 40000000))
747         self.test_session.send_packet()
748         self.pg0.enable_capture()
749         p = self.pg0.wait_for_packet(1)
750         self.assert_equal(p[BFD].your_discriminator,
751                           self.test_session.my_discriminator,
752                           "BFD - your discriminator")
753         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
754         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
755                                  state=BFDState.up)
756         self.logger.info("BFD: Waiting for event")
757         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758         verify_event(self, e, expected_state=BFDState.init)
759         self.logger.info("BFD: Sending Up")
760         self.test_session.send_packet()
761         self.logger.info("BFD: Waiting for event")
762         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
763         verify_event(self, e, expected_state=BFDState.up)
764         self.logger.info("BFD: Session is Up")
765         self.test_session.update(state=BFDState.up)
766         self.test_session.send_packet()
767         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
768
769     def test_session_down(self):
770         """ bring BFD session down """
771         bfd_session_up(self)
772         bfd_session_down(self)
773
774     @unittest.skipUnless(running_extended_tests, "part of extended tests")
775     def test_hold_up(self):
776         """ hold BFD session up """
777         bfd_session_up(self)
778         for dummy in range(self.test_session.detect_mult * 2):
779             wait_for_bfd_packet(self)
780             self.test_session.send_packet()
781         self.assert_equal(len(self.vapi.collect_events()), 0,
782                           "number of bfd events")
783
784     @unittest.skipUnless(running_extended_tests, "part of extended tests")
785     def test_slow_timer(self):
786         """ verify slow periodic control frames while session down """
787         packet_count = 3
788         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
789         prev_packet = wait_for_bfd_packet(self, 2)
790         for dummy in range(packet_count):
791             next_packet = wait_for_bfd_packet(self, 2)
792             time_diff = next_packet.time - prev_packet.time
793             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
794             # to work around timing issues
795             self.assert_in_range(
796                 time_diff, 0.70, 1.05, "time between slow packets")
797             prev_packet = next_packet
798
799     @unittest.skipUnless(running_extended_tests, "part of extended tests")
800     def test_zero_remote_min_rx(self):
801         """ no packets when zero remote required min rx interval """
802         bfd_session_up(self)
803         self.test_session.update(required_min_rx=0)
804         self.test_session.send_packet()
805         for dummy in range(self.test_session.detect_mult):
806             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
807                        "sleep before transmitting bfd packet")
808             self.test_session.send_packet()
809             try:
810                 p = wait_for_bfd_packet(self, timeout=0)
811                 self.logger.error(ppp("Received unexpected packet:", p))
812             except CaptureTimeoutError:
813                 pass
814         self.assert_equal(
815             len(self.vapi.collect_events()), 0, "number of bfd events")
816         self.test_session.update(required_min_rx=300000)
817         for dummy in range(3):
818             self.test_session.send_packet()
819             wait_for_bfd_packet(
820                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
821         self.assert_equal(
822             len(self.vapi.collect_events()), 0, "number of bfd events")
823
824     @unittest.skipUnless(running_extended_tests, "part of extended tests")
825     def test_conn_down(self):
826         """ verify session goes down after inactivity """
827         bfd_session_up(self)
828         detection_time = self.test_session.detect_mult *\
829             self.vpp_session.required_min_rx / USEC_IN_SEC
830         self.sleep(detection_time, "waiting for BFD session time-out")
831         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
832         verify_event(self, e, expected_state=BFDState.down)
833
834     def test_peer_discr_reset_sess_down(self):
835         """ peer discriminator reset after session goes down """
836         bfd_session_up(self)
837         detection_time = self.test_session.detect_mult *\
838             self.vpp_session.required_min_rx / USEC_IN_SEC
839         self.sleep(detection_time, "waiting for BFD session time-out")
840         self.test_session.my_discriminator = 0
841         wait_for_bfd_packet(self,
842                             pcap_time_min=time.time() - self.vpp_clock_offset)
843
844     @unittest.skipUnless(running_extended_tests, "part of extended tests")
845     def test_large_required_min_rx(self):
846         """ large remote required min rx interval """
847         bfd_session_up(self)
848         p = wait_for_bfd_packet(self)
849         interval = 3000000
850         self.test_session.update(required_min_rx=interval)
851         self.test_session.send_packet()
852         time_mark = time.time()
853         count = 0
854         # busy wait here, trying to collect a packet or event, vpp is not
855         # allowed to send packets and the session will timeout first - so the
856         # Up->Down event must arrive before any packets do
857         while time.time() < time_mark + interval / USEC_IN_SEC:
858             try:
859                 p = wait_for_bfd_packet(self, timeout=0)
860                 # if vpp managed to send a packet before we did the session
861                 # session update, then that's fine, ignore it
862                 if p.time < time_mark - self.vpp_clock_offset:
863                     continue
864                 self.logger.error(ppp("Received unexpected packet:", p))
865                 count += 1
866             except CaptureTimeoutError:
867                 pass
868             events = self.vapi.collect_events()
869             if len(events) > 0:
870                 verify_event(self, events[0], BFDState.down)
871                 break
872         self.assert_equal(count, 0, "number of packets received")
873
874     @unittest.skipUnless(running_extended_tests, "part of extended tests")
875     def test_immediate_remote_min_rx_reduction(self):
876         """ immediately honor remote required min rx reduction """
877         self.vpp_session.remove_vpp_config()
878         self.vpp_session = VppBFDUDPSession(
879             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
880         self.pg0.enable_capture()
881         self.vpp_session.add_vpp_config()
882         self.test_session.update(desired_min_tx=1000000,
883                                  required_min_rx=1000000)
884         bfd_session_up(self)
885         reference_packet = wait_for_bfd_packet(self)
886         time_mark = time.time()
887         interval = 300000
888         self.test_session.update(required_min_rx=interval)
889         self.test_session.send_packet()
890         extra_time = time.time() - time_mark
891         p = wait_for_bfd_packet(self)
892         # first packet is allowed to be late by time we spent doing the update
893         # calculated in extra_time
894         self.assert_in_range(p.time - reference_packet.time,
895                              .95 * 0.75 * interval / USEC_IN_SEC,
896                              1.05 * interval / USEC_IN_SEC + extra_time,
897                              "time between BFD packets")
898         reference_packet = p
899         for dummy in range(3):
900             p = wait_for_bfd_packet(self)
901             diff = p.time - reference_packet.time
902             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
903                                  1.05 * interval / USEC_IN_SEC,
904                                  "time between BFD packets")
905             reference_packet = p
906
907     @unittest.skipUnless(running_extended_tests, "part of extended tests")
908     def test_modify_req_min_rx_double(self):
909         """ modify session - double required min rx """
910         bfd_session_up(self)
911         p = wait_for_bfd_packet(self)
912         self.test_session.update(desired_min_tx=10000,
913                                  required_min_rx=10000)
914         self.test_session.send_packet()
915         # double required min rx
916         self.vpp_session.modify_parameters(
917             required_min_rx=2 * self.vpp_session.required_min_rx)
918         p = wait_for_bfd_packet(
919             self, pcap_time_min=time.time() - self.vpp_clock_offset)
920         # poll bit needs to be set
921         self.assertIn("P", p.sprintf("%BFD.flags%"),
922                       "Poll bit not set in BFD packet")
923         # finish poll sequence with final packet
924         final = self.test_session.create_packet()
925         final[BFD].flags = "F"
926         timeout = self.test_session.detect_mult * \
927             max(self.test_session.desired_min_tx,
928                 self.vpp_session.required_min_rx) / USEC_IN_SEC
929         self.test_session.send_packet(final)
930         time_mark = time.time()
931         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
932         verify_event(self, e, expected_state=BFDState.down)
933         time_to_event = time.time() - time_mark
934         self.assert_in_range(time_to_event, .9 * timeout,
935                              1.1 * timeout, "session timeout")
936
937     @unittest.skipUnless(running_extended_tests, "part of extended tests")
938     def test_modify_req_min_rx_halve(self):
939         """ modify session - halve required min rx """
940         self.vpp_session.modify_parameters(
941             required_min_rx=2 * self.vpp_session.required_min_rx)
942         bfd_session_up(self)
943         p = wait_for_bfd_packet(self)
944         self.test_session.update(desired_min_tx=10000,
945                                  required_min_rx=10000)
946         self.test_session.send_packet()
947         p = wait_for_bfd_packet(
948             self, pcap_time_min=time.time() - self.vpp_clock_offset)
949         # halve required min rx
950         old_required_min_rx = self.vpp_session.required_min_rx
951         self.vpp_session.modify_parameters(
952             required_min_rx=self.vpp_session.required_min_rx // 2)
953         # now we wait 0.8*3*old-req-min-rx and the session should still be up
954         self.sleep(0.8 * self.vpp_session.detect_mult *
955                    old_required_min_rx / USEC_IN_SEC,
956                    "wait before finishing poll sequence")
957         self.assert_equal(len(self.vapi.collect_events()), 0,
958                           "number of bfd events")
959         p = wait_for_bfd_packet(self)
960         # poll bit needs to be set
961         self.assertIn("P", p.sprintf("%BFD.flags%"),
962                       "Poll bit not set in BFD packet")
963         # finish poll sequence with final packet
964         final = self.test_session.create_packet()
965         final[BFD].flags = "F"
966         self.test_session.send_packet(final)
967         # now the session should time out under new conditions
968         detection_time = self.test_session.detect_mult *\
969             self.vpp_session.required_min_rx / USEC_IN_SEC
970         before = time.time()
971         e = self.vapi.wait_for_event(
972             2 * detection_time, "bfd_udp_session_details")
973         after = time.time()
974         self.assert_in_range(after - before,
975                              0.9 * detection_time,
976                              1.1 * detection_time,
977                              "time before bfd session goes down")
978         verify_event(self, e, expected_state=BFDState.down)
979
980     @unittest.skipUnless(running_extended_tests, "part of extended tests")
981     def test_modify_detect_mult(self):
982         """ modify detect multiplier """
983         bfd_session_up(self)
984         p = wait_for_bfd_packet(self)
985         self.vpp_session.modify_parameters(detect_mult=1)
986         p = wait_for_bfd_packet(
987             self, pcap_time_min=time.time() - self.vpp_clock_offset)
988         self.assert_equal(self.vpp_session.detect_mult,
989                           p[BFD].detect_mult,
990                           "detect mult")
991         # poll bit must not be set
992         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
993                          "Poll bit not set in BFD packet")
994         self.vpp_session.modify_parameters(detect_mult=10)
995         p = wait_for_bfd_packet(
996             self, pcap_time_min=time.time() - self.vpp_clock_offset)
997         self.assert_equal(self.vpp_session.detect_mult,
998                           p[BFD].detect_mult,
999                           "detect mult")
1000         # poll bit must not be set
1001         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1002                          "Poll bit not set in BFD packet")
1003
1004     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1005     def test_queued_poll(self):
1006         """ test poll sequence queueing """
1007         bfd_session_up(self)
1008         p = wait_for_bfd_packet(self)
1009         self.vpp_session.modify_parameters(
1010             required_min_rx=2 * self.vpp_session.required_min_rx)
1011         p = wait_for_bfd_packet(self)
1012         poll_sequence_start = time.time()
1013         poll_sequence_length_min = 0.5
1014         send_final_after = time.time() + poll_sequence_length_min
1015         # poll bit needs to be set
1016         self.assertIn("P", p.sprintf("%BFD.flags%"),
1017                       "Poll bit not set in BFD packet")
1018         self.assert_equal(p[BFD].required_min_rx_interval,
1019                           self.vpp_session.required_min_rx,
1020                           "BFD required min rx interval")
1021         self.vpp_session.modify_parameters(
1022             required_min_rx=2 * self.vpp_session.required_min_rx)
1023         # 2nd poll sequence should be queued now
1024         # don't send the reply back yet, wait for some time to emulate
1025         # longer round-trip time
1026         packet_count = 0
1027         while time.time() < send_final_after:
1028             self.test_session.send_packet()
1029             p = wait_for_bfd_packet(self)
1030             self.assert_equal(len(self.vapi.collect_events()), 0,
1031                               "number of bfd events")
1032             self.assert_equal(p[BFD].required_min_rx_interval,
1033                               self.vpp_session.required_min_rx,
1034                               "BFD required min rx interval")
1035             packet_count += 1
1036             # poll bit must be set
1037             self.assertIn("P", p.sprintf("%BFD.flags%"),
1038                           "Poll bit not set in BFD packet")
1039         final = self.test_session.create_packet()
1040         final[BFD].flags = "F"
1041         self.test_session.send_packet(final)
1042         # finish 1st with final
1043         poll_sequence_length = time.time() - poll_sequence_start
1044         # vpp must wait for some time before starting new poll sequence
1045         poll_no_2_started = False
1046         for dummy in range(2 * packet_count):
1047             p = wait_for_bfd_packet(self)
1048             self.assert_equal(len(self.vapi.collect_events()), 0,
1049                               "number of bfd events")
1050             if "P" in p.sprintf("%BFD.flags%"):
1051                 poll_no_2_started = True
1052                 if time.time() < poll_sequence_start + poll_sequence_length:
1053                     raise Exception("VPP started 2nd poll sequence too soon")
1054                 final = self.test_session.create_packet()
1055                 final[BFD].flags = "F"
1056                 self.test_session.send_packet(final)
1057                 break
1058             else:
1059                 self.test_session.send_packet()
1060         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1061         # finish 2nd with final
1062         final = self.test_session.create_packet()
1063         final[BFD].flags = "F"
1064         self.test_session.send_packet(final)
1065         p = wait_for_bfd_packet(self)
1066         # poll bit must not be set
1067         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1068                          "Poll bit set in BFD packet")
1069
1070     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1071     def test_poll_response(self):
1072         """ test correct response to control frame with poll bit set """
1073         bfd_session_up(self)
1074         poll = self.test_session.create_packet()
1075         poll[BFD].flags = "P"
1076         self.test_session.send_packet(poll)
1077         final = wait_for_bfd_packet(
1078             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1079         self.assertIn("F", final.sprintf("%BFD.flags%"))
1080
1081     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1082     def test_no_periodic_if_remote_demand(self):
1083         """ no periodic frames outside poll sequence if remote demand set """
1084         bfd_session_up(self)
1085         demand = self.test_session.create_packet()
1086         demand[BFD].flags = "D"
1087         self.test_session.send_packet(demand)
1088         transmit_time = 0.9 \
1089             * max(self.vpp_session.required_min_rx,
1090                   self.test_session.desired_min_tx) \
1091             / USEC_IN_SEC
1092         count = 0
1093         for dummy in range(self.test_session.detect_mult * 2):
1094             self.sleep(transmit_time)
1095             self.test_session.send_packet(demand)
1096             try:
1097                 p = wait_for_bfd_packet(self, timeout=0)
1098                 self.logger.error(ppp("Received unexpected packet:", p))
1099                 count += 1
1100             except CaptureTimeoutError:
1101                 pass
1102         events = self.vapi.collect_events()
1103         for e in events:
1104             self.logger.error("Received unexpected event: %s", e)
1105         self.assert_equal(count, 0, "number of packets received")
1106         self.assert_equal(len(events), 0, "number of events received")
1107
1108     def test_echo_looped_back(self):
1109         """ echo packets looped back """
1110         # don't need a session in this case..
1111         self.vpp_session.remove_vpp_config()
1112         self.pg0.enable_capture()
1113         echo_packet_count = 10
1114         # random source port low enough to increment a few times..
1115         udp_sport_tx = randint(1, 50000)
1116         udp_sport_rx = udp_sport_tx
1117         echo_packet = (Ether(src=self.pg0.remote_mac,
1118                              dst=self.pg0.local_mac) /
1119                        IP(src=self.pg0.remote_ip4,
1120                           dst=self.pg0.remote_ip4) /
1121                        UDP(dport=BFD.udp_dport_echo) /
1122                        Raw("this should be looped back"))
1123         for dummy in range(echo_packet_count):
1124             self.sleep(.01, "delay between echo packets")
1125             echo_packet[UDP].sport = udp_sport_tx
1126             udp_sport_tx += 1
1127             self.logger.debug(ppp("Sending packet:", echo_packet))
1128             self.pg0.add_stream(echo_packet)
1129             self.pg_start()
1130         for dummy in range(echo_packet_count):
1131             p = self.pg0.wait_for_packet(1)
1132             self.logger.debug(ppp("Got packet:", p))
1133             ether = p[Ether]
1134             self.assert_equal(self.pg0.remote_mac,
1135                               ether.dst, "Destination MAC")
1136             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1137             ip = p[IP]
1138             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1139             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1140             udp = p[UDP]
1141             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1142                               "UDP destination port")
1143             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1144             udp_sport_rx += 1
1145             # need to compare the hex payload here, otherwise BFD_vpp_echo
1146             # gets in way
1147             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1148                              scapy.compat.raw(echo_packet[UDP].payload),
1149                              "Received packet is not the echo packet sent")
1150         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1151                           "ECHO packet identifier for test purposes)")
1152
1153     def test_echo(self):
1154         """ echo function """
1155         bfd_session_up(self)
1156         self.test_session.update(required_min_echo_rx=150000)
1157         self.test_session.send_packet()
1158         detection_time = self.test_session.detect_mult *\
1159             self.vpp_session.required_min_rx / USEC_IN_SEC
1160         # echo shouldn't work without echo source set
1161         for dummy in range(10):
1162             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1163             self.sleep(sleep, "delay before sending bfd packet")
1164             self.test_session.send_packet()
1165         p = wait_for_bfd_packet(
1166             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1167         self.assert_equal(p[BFD].required_min_rx_interval,
1168                           self.vpp_session.required_min_rx,
1169                           "BFD required min rx interval")
1170         self.test_session.send_packet()
1171         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1172         echo_seen = False
1173         # should be turned on - loopback echo packets
1174         for dummy in range(3):
1175             loop_until = time.time() + 0.75 * detection_time
1176             while time.time() < loop_until:
1177                 p = self.pg0.wait_for_packet(1)
1178                 self.logger.debug(ppp("Got packet:", p))
1179                 if p[UDP].dport == BFD.udp_dport_echo:
1180                     self.assert_equal(
1181                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1182                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1183                                         "BFD ECHO src IP equal to loopback IP")
1184                     self.logger.debug(ppp("Looping back packet:", p))
1185                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1186                                       "ECHO packet destination MAC address")
1187                     p[Ether].dst = self.pg0.local_mac
1188                     self.pg0.add_stream(p)
1189                     self.pg_start()
1190                     echo_seen = True
1191                 elif p.haslayer(BFD):
1192                     if echo_seen:
1193                         self.assertGreaterEqual(
1194                             p[BFD].required_min_rx_interval,
1195                             1000000)
1196                     if "P" in p.sprintf("%BFD.flags%"):
1197                         final = self.test_session.create_packet()
1198                         final[BFD].flags = "F"
1199                         self.test_session.send_packet(final)
1200                 else:
1201                     raise Exception(ppp("Received unknown packet:", p))
1202
1203                 self.assert_equal(len(self.vapi.collect_events()), 0,
1204                                   "number of bfd events")
1205             self.test_session.send_packet()
1206         self.assertTrue(echo_seen, "No echo packets received")
1207
1208     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1209     def test_echo_fail(self):
1210         """ session goes down if echo function fails """
1211         bfd_session_up(self)
1212         self.test_session.update(required_min_echo_rx=150000)
1213         self.test_session.send_packet()
1214         detection_time = self.test_session.detect_mult *\
1215             self.vpp_session.required_min_rx / USEC_IN_SEC
1216         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1217         # echo function should be used now, but we will drop the echo packets
1218         verified_diag = False
1219         for dummy in range(3):
1220             loop_until = time.time() + 0.75 * detection_time
1221             while time.time() < loop_until:
1222                 p = self.pg0.wait_for_packet(1)
1223                 self.logger.debug(ppp("Got packet:", p))
1224                 if p[UDP].dport == BFD.udp_dport_echo:
1225                     # dropped
1226                     pass
1227                 elif p.haslayer(BFD):
1228                     if "P" in p.sprintf("%BFD.flags%"):
1229                         self.assertGreaterEqual(
1230                             p[BFD].required_min_rx_interval,
1231                             1000000)
1232                         final = self.test_session.create_packet()
1233                         final[BFD].flags = "F"
1234                         self.test_session.send_packet(final)
1235                     if p[BFD].state == BFDState.down:
1236                         self.assert_equal(p[BFD].diag,
1237                                           BFDDiagCode.echo_function_failed,
1238                                           BFDDiagCode)
1239                         verified_diag = True
1240                 else:
1241                     raise Exception(ppp("Received unknown packet:", p))
1242             self.test_session.send_packet()
1243         events = self.vapi.collect_events()
1244         self.assert_equal(len(events), 1, "number of bfd events")
1245         self.assert_equal(events[0].state, BFDState.down, BFDState)
1246         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1247
1248     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1249     def test_echo_stop(self):
1250         """ echo function stops if peer sets required min echo rx zero """
1251         bfd_session_up(self)
1252         self.test_session.update(required_min_echo_rx=150000)
1253         self.test_session.send_packet()
1254         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1255         # wait for first echo packet
1256         while True:
1257             p = self.pg0.wait_for_packet(1)
1258             self.logger.debug(ppp("Got packet:", p))
1259             if p[UDP].dport == BFD.udp_dport_echo:
1260                 self.logger.debug(ppp("Looping back packet:", p))
1261                 p[Ether].dst = self.pg0.local_mac
1262                 self.pg0.add_stream(p)
1263                 self.pg_start()
1264                 break
1265             elif p.haslayer(BFD):
1266                 # ignore BFD
1267                 pass
1268             else:
1269                 raise Exception(ppp("Received unknown packet:", p))
1270         self.test_session.update(required_min_echo_rx=0)
1271         self.test_session.send_packet()
1272         # echo packets shouldn't arrive anymore
1273         for dummy in range(5):
1274             wait_for_bfd_packet(
1275                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1276             self.test_session.send_packet()
1277             events = self.vapi.collect_events()
1278             self.assert_equal(len(events), 0, "number of bfd events")
1279
1280     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1281     def test_echo_source_removed(self):
1282         """ echo function stops if echo source is removed """
1283         bfd_session_up(self)
1284         self.test_session.update(required_min_echo_rx=150000)
1285         self.test_session.send_packet()
1286         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1287         # wait for first echo packet
1288         while True:
1289             p = self.pg0.wait_for_packet(1)
1290             self.logger.debug(ppp("Got packet:", p))
1291             if p[UDP].dport == BFD.udp_dport_echo:
1292                 self.logger.debug(ppp("Looping back packet:", p))
1293                 p[Ether].dst = self.pg0.local_mac
1294                 self.pg0.add_stream(p)
1295                 self.pg_start()
1296                 break
1297             elif p.haslayer(BFD):
1298                 # ignore BFD
1299                 pass
1300             else:
1301                 raise Exception(ppp("Received unknown packet:", p))
1302         self.vapi.bfd_udp_del_echo_source()
1303         self.test_session.send_packet()
1304         # echo packets shouldn't arrive anymore
1305         for dummy in range(5):
1306             wait_for_bfd_packet(
1307                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1308             self.test_session.send_packet()
1309             events = self.vapi.collect_events()
1310             self.assert_equal(len(events), 0, "number of bfd events")
1311
1312     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1313     def test_stale_echo(self):
1314         """ stale echo packets don't keep a session up """
1315         bfd_session_up(self)
1316         self.test_session.update(required_min_echo_rx=150000)
1317         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1318         self.test_session.send_packet()
1319         # should be turned on - loopback echo packets
1320         echo_packet = None
1321         timeout_at = None
1322         timeout_ok = False
1323         for dummy in range(10 * self.vpp_session.detect_mult):
1324             p = self.pg0.wait_for_packet(1)
1325             if p[UDP].dport == BFD.udp_dport_echo:
1326                 if echo_packet is None:
1327                     self.logger.debug(ppp("Got first echo packet:", p))
1328                     echo_packet = p
1329                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1330                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1331                 else:
1332                     self.logger.debug(ppp("Got followup echo packet:", p))
1333                 self.logger.debug(ppp("Looping back first echo packet:", p))
1334                 echo_packet[Ether].dst = self.pg0.local_mac
1335                 self.pg0.add_stream(echo_packet)
1336                 self.pg_start()
1337             elif p.haslayer(BFD):
1338                 self.logger.debug(ppp("Got packet:", p))
1339                 if "P" in p.sprintf("%BFD.flags%"):
1340                     final = self.test_session.create_packet()
1341                     final[BFD].flags = "F"
1342                     self.test_session.send_packet(final)
1343                 if p[BFD].state == BFDState.down:
1344                     self.assertIsNotNone(
1345                         timeout_at,
1346                         "Session went down before first echo packet received")
1347                     now = time.time()
1348                     self.assertGreaterEqual(
1349                         now, timeout_at,
1350                         "Session timeout at %s, but is expected at %s" %
1351                         (now, timeout_at))
1352                     self.assert_equal(p[BFD].diag,
1353                                       BFDDiagCode.echo_function_failed,
1354                                       BFDDiagCode)
1355                     events = self.vapi.collect_events()
1356                     self.assert_equal(len(events), 1, "number of bfd events")
1357                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1358                     timeout_ok = True
1359                     break
1360             else:
1361                 raise Exception(ppp("Received unknown packet:", p))
1362             self.test_session.send_packet()
1363         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1364
1365     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1366     def test_invalid_echo_checksum(self):
1367         """ echo packets with invalid checksum don't keep a session up """
1368         bfd_session_up(self)
1369         self.test_session.update(required_min_echo_rx=150000)
1370         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1371         self.test_session.send_packet()
1372         # should be turned on - loopback echo packets
1373         timeout_at = None
1374         timeout_ok = False
1375         for dummy in range(10 * self.vpp_session.detect_mult):
1376             p = self.pg0.wait_for_packet(1)
1377             if p[UDP].dport == BFD.udp_dport_echo:
1378                 self.logger.debug(ppp("Got echo packet:", p))
1379                 if timeout_at is None:
1380                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1381                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1382                 p[BFD_vpp_echo].checksum = getrandbits(64)
1383                 p[Ether].dst = self.pg0.local_mac
1384                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1385                 self.pg0.add_stream(p)
1386                 self.pg_start()
1387             elif p.haslayer(BFD):
1388                 self.logger.debug(ppp("Got packet:", p))
1389                 if "P" in p.sprintf("%BFD.flags%"):
1390                     final = self.test_session.create_packet()
1391                     final[BFD].flags = "F"
1392                     self.test_session.send_packet(final)
1393                 if p[BFD].state == BFDState.down:
1394                     self.assertIsNotNone(
1395                         timeout_at,
1396                         "Session went down before first echo packet received")
1397                     now = time.time()
1398                     self.assertGreaterEqual(
1399                         now, timeout_at,
1400                         "Session timeout at %s, but is expected at %s" %
1401                         (now, timeout_at))
1402                     self.assert_equal(p[BFD].diag,
1403                                       BFDDiagCode.echo_function_failed,
1404                                       BFDDiagCode)
1405                     events = self.vapi.collect_events()
1406                     self.assert_equal(len(events), 1, "number of bfd events")
1407                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1408                     timeout_ok = True
1409                     break
1410             else:
1411                 raise Exception(ppp("Received unknown packet:", p))
1412             self.test_session.send_packet()
1413         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1414
1415     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1416     def test_admin_up_down(self):
1417         """ put session admin-up and admin-down """
1418         bfd_session_up(self)
1419         self.vpp_session.admin_down()
1420         self.pg0.enable_capture()
1421         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1422         verify_event(self, e, expected_state=BFDState.admin_down)
1423         for dummy in range(2):
1424             p = wait_for_bfd_packet(self)
1425             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1426         # try to bring session up - shouldn't be possible
1427         self.test_session.update(state=BFDState.init)
1428         self.test_session.send_packet()
1429         for dummy in range(2):
1430             p = wait_for_bfd_packet(self)
1431             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1432         self.vpp_session.admin_up()
1433         self.test_session.update(state=BFDState.down)
1434         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1435         verify_event(self, e, expected_state=BFDState.down)
1436         p = wait_for_bfd_packet(
1437             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1438         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1439         self.test_session.send_packet()
1440         p = wait_for_bfd_packet(
1441             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1442         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1443         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1444         verify_event(self, e, expected_state=BFDState.init)
1445         self.test_session.update(state=BFDState.up)
1446         self.test_session.send_packet()
1447         p = wait_for_bfd_packet(
1448             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1449         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1450         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1451         verify_event(self, e, expected_state=BFDState.up)
1452
1453     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1454     def test_config_change_remote_demand(self):
1455         """ configuration change while peer in demand mode """
1456         bfd_session_up(self)
1457         demand = self.test_session.create_packet()
1458         demand[BFD].flags = "D"
1459         self.test_session.send_packet(demand)
1460         self.vpp_session.modify_parameters(
1461             required_min_rx=2 * self.vpp_session.required_min_rx)
1462         p = wait_for_bfd_packet(
1463             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1464         # poll bit must be set
1465         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1466         # terminate poll sequence
1467         final = self.test_session.create_packet()
1468         final[BFD].flags = "D+F"
1469         self.test_session.send_packet(final)
1470         # vpp should be quiet now again
1471         transmit_time = 0.9 \
1472             * max(self.vpp_session.required_min_rx,
1473                   self.test_session.desired_min_tx) \
1474             / USEC_IN_SEC
1475         count = 0
1476         for dummy in range(self.test_session.detect_mult * 2):
1477             self.sleep(transmit_time)
1478             self.test_session.send_packet(demand)
1479             try:
1480                 p = wait_for_bfd_packet(self, timeout=0)
1481                 self.logger.error(ppp("Received unexpected packet:", p))
1482                 count += 1
1483             except CaptureTimeoutError:
1484                 pass
1485         events = self.vapi.collect_events()
1486         for e in events:
1487             self.logger.error("Received unexpected event: %s", e)
1488         self.assert_equal(count, 0, "number of packets received")
1489         self.assert_equal(len(events), 0, "number of events received")
1490
1491     def test_intf_deleted(self):
1492         """ interface with bfd session deleted """
1493         intf = VppLoInterface(self)
1494         intf.config_ip4()
1495         intf.admin_up()
1496         sw_if_index = intf.sw_if_index
1497         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1498         vpp_session.add_vpp_config()
1499         vpp_session.admin_up()
1500         intf.remove_vpp_config()
1501         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1502         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1503         self.assertFalse(vpp_session.query_vpp_config())
1504
1505
1506 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1507 class BFD6TestCase(VppTestCase):
1508     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1509
1510     pg0 = None
1511     vpp_clock_offset = None
1512     vpp_session = None
1513     test_session = None
1514
1515     @classmethod
1516     def setUpClass(cls):
1517         super(BFD6TestCase, cls).setUpClass()
1518         cls.vapi.cli("set log class bfd level debug")
1519         try:
1520             cls.create_pg_interfaces([0])
1521             cls.pg0.config_ip6()
1522             cls.pg0.configure_ipv6_neighbors()
1523             cls.pg0.admin_up()
1524             cls.pg0.resolve_ndp()
1525             cls.create_loopback_interfaces(1)
1526             cls.loopback0 = cls.lo_interfaces[0]
1527             cls.loopback0.config_ip6()
1528             cls.loopback0.admin_up()
1529
1530         except Exception:
1531             super(BFD6TestCase, cls).tearDownClass()
1532             raise
1533
1534     @classmethod
1535     def tearDownClass(cls):
1536         super(BFD6TestCase, cls).tearDownClass()
1537
1538     def setUp(self):
1539         super(BFD6TestCase, self).setUp()
1540         self.factory = AuthKeyFactory()
1541         self.vapi.want_bfd_events()
1542         self.pg0.enable_capture()
1543         try:
1544             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1545                                                 self.pg0.remote_ip6,
1546                                                 af=AF_INET6)
1547             self.vpp_session.add_vpp_config()
1548             self.vpp_session.admin_up()
1549             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1550             self.logger.debug(self.vapi.cli("show adj nbr"))
1551         except:
1552             self.vapi.want_bfd_events(enable_disable=0)
1553             raise
1554
1555     def tearDown(self):
1556         if not self.vpp_dead:
1557             self.vapi.want_bfd_events(enable_disable=0)
1558         self.vapi.collect_events()  # clear the event queue
1559         super(BFD6TestCase, self).tearDown()
1560
1561     def test_session_up(self):
1562         """ bring BFD session up """
1563         bfd_session_up(self)
1564
1565     def test_session_up_by_ip(self):
1566         """ bring BFD session up - first frame looked up by address pair """
1567         self.logger.info("BFD: Sending Slow control frame")
1568         self.test_session.update(my_discriminator=randint(0, 40000000))
1569         self.test_session.send_packet()
1570         self.pg0.enable_capture()
1571         p = self.pg0.wait_for_packet(1)
1572         self.assert_equal(p[BFD].your_discriminator,
1573                           self.test_session.my_discriminator,
1574                           "BFD - your discriminator")
1575         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1576         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1577                                  state=BFDState.up)
1578         self.logger.info("BFD: Waiting for event")
1579         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1580         verify_event(self, e, expected_state=BFDState.init)
1581         self.logger.info("BFD: Sending Up")
1582         self.test_session.send_packet()
1583         self.logger.info("BFD: Waiting for event")
1584         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1585         verify_event(self, e, expected_state=BFDState.up)
1586         self.logger.info("BFD: Session is Up")
1587         self.test_session.update(state=BFDState.up)
1588         self.test_session.send_packet()
1589         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1590
1591     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1592     def test_hold_up(self):
1593         """ hold BFD session up """
1594         bfd_session_up(self)
1595         for dummy in range(self.test_session.detect_mult * 2):
1596             wait_for_bfd_packet(self)
1597             self.test_session.send_packet()
1598         self.assert_equal(len(self.vapi.collect_events()), 0,
1599                           "number of bfd events")
1600         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1601
1602     def test_echo_looped_back(self):
1603         """ echo packets looped back """
1604         # don't need a session in this case..
1605         self.vpp_session.remove_vpp_config()
1606         self.pg0.enable_capture()
1607         echo_packet_count = 10
1608         # random source port low enough to increment a few times..
1609         udp_sport_tx = randint(1, 50000)
1610         udp_sport_rx = udp_sport_tx
1611         echo_packet = (Ether(src=self.pg0.remote_mac,
1612                              dst=self.pg0.local_mac) /
1613                        IPv6(src=self.pg0.remote_ip6,
1614                             dst=self.pg0.remote_ip6) /
1615                        UDP(dport=BFD.udp_dport_echo) /
1616                        Raw("this should be looped back"))
1617         for dummy in range(echo_packet_count):
1618             self.sleep(.01, "delay between echo packets")
1619             echo_packet[UDP].sport = udp_sport_tx
1620             udp_sport_tx += 1
1621             self.logger.debug(ppp("Sending packet:", echo_packet))
1622             self.pg0.add_stream(echo_packet)
1623             self.pg_start()
1624         for dummy in range(echo_packet_count):
1625             p = self.pg0.wait_for_packet(1)
1626             self.logger.debug(ppp("Got packet:", p))
1627             ether = p[Ether]
1628             self.assert_equal(self.pg0.remote_mac,
1629                               ether.dst, "Destination MAC")
1630             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1631             ip = p[IPv6]
1632             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1633             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1634             udp = p[UDP]
1635             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1636                               "UDP destination port")
1637             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1638             udp_sport_rx += 1
1639             # need to compare the hex payload here, otherwise BFD_vpp_echo
1640             # gets in way
1641             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1642                              scapy.compat.raw(echo_packet[UDP].payload),
1643                              "Received packet is not the echo packet sent")
1644         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1645                           "ECHO packet identifier for test purposes)")
1646         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1647                           "ECHO packet identifier for test purposes)")
1648
1649     def test_echo(self):
1650         """ echo function """
1651         bfd_session_up(self)
1652         self.test_session.update(required_min_echo_rx=150000)
1653         self.test_session.send_packet()
1654         detection_time = self.test_session.detect_mult *\
1655             self.vpp_session.required_min_rx / USEC_IN_SEC
1656         # echo shouldn't work without echo source set
1657         for dummy in range(10):
1658             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1659             self.sleep(sleep, "delay before sending bfd packet")
1660             self.test_session.send_packet()
1661         p = wait_for_bfd_packet(
1662             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1663         self.assert_equal(p[BFD].required_min_rx_interval,
1664                           self.vpp_session.required_min_rx,
1665                           "BFD required min rx interval")
1666         self.test_session.send_packet()
1667         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1668         echo_seen = False
1669         # should be turned on - loopback echo packets
1670         for dummy in range(3):
1671             loop_until = time.time() + 0.75 * detection_time
1672             while time.time() < loop_until:
1673                 p = self.pg0.wait_for_packet(1)
1674                 self.logger.debug(ppp("Got packet:", p))
1675                 if p[UDP].dport == BFD.udp_dport_echo:
1676                     self.assert_equal(
1677                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1678                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1679                                         "BFD ECHO src IP equal to loopback IP")
1680                     self.logger.debug(ppp("Looping back packet:", p))
1681                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1682                                       "ECHO packet destination MAC address")
1683                     p[Ether].dst = self.pg0.local_mac
1684                     self.pg0.add_stream(p)
1685                     self.pg_start()
1686                     echo_seen = True
1687                 elif p.haslayer(BFD):
1688                     if echo_seen:
1689                         self.assertGreaterEqual(
1690                             p[BFD].required_min_rx_interval,
1691                             1000000)
1692                     if "P" in p.sprintf("%BFD.flags%"):
1693                         final = self.test_session.create_packet()
1694                         final[BFD].flags = "F"
1695                         self.test_session.send_packet(final)
1696                 else:
1697                     raise Exception(ppp("Received unknown packet:", p))
1698
1699                 self.assert_equal(len(self.vapi.collect_events()), 0,
1700                                   "number of bfd events")
1701             self.test_session.send_packet()
1702         self.assertTrue(echo_seen, "No echo packets received")
1703
1704     def test_intf_deleted(self):
1705         """ interface with bfd session deleted """
1706         intf = VppLoInterface(self)
1707         intf.config_ip6()
1708         intf.admin_up()
1709         sw_if_index = intf.sw_if_index
1710         vpp_session = VppBFDUDPSession(
1711             self, intf, intf.remote_ip6, af=AF_INET6)
1712         vpp_session.add_vpp_config()
1713         vpp_session.admin_up()
1714         intf.remove_vpp_config()
1715         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1716         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1717         self.assertFalse(vpp_session.query_vpp_config())
1718
1719
1720 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1721 class BFDFIBTestCase(VppTestCase):
1722     """ BFD-FIB interactions (IPv6) """
1723
1724     vpp_session = None
1725     test_session = None
1726
1727     @classmethod
1728     def setUpClass(cls):
1729         super(BFDFIBTestCase, cls).setUpClass()
1730
1731     @classmethod
1732     def tearDownClass(cls):
1733         super(BFDFIBTestCase, cls).tearDownClass()
1734
1735     def setUp(self):
1736         super(BFDFIBTestCase, self).setUp()
1737         self.create_pg_interfaces(range(1))
1738
1739         self.vapi.want_bfd_events()
1740         self.pg0.enable_capture()
1741
1742         for i in self.pg_interfaces:
1743             i.admin_up()
1744             i.config_ip6()
1745             i.configure_ipv6_neighbors()
1746
1747     def tearDown(self):
1748         if not self.vpp_dead:
1749             self.vapi.want_bfd_events(enable_disable=0)
1750
1751         super(BFDFIBTestCase, self).tearDown()
1752
1753     @staticmethod
1754     def pkt_is_not_data_traffic(p):
1755         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1756         if p.haslayer(BFD) or is_ipv6_misc(p):
1757             return True
1758         return False
1759
1760     def test_session_with_fib(self):
1761         """ BFD-FIB interactions """
1762
1763         # packets to match against both of the routes
1764         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1765               IPv6(src="3001::1", dst="2001::1") /
1766               UDP(sport=1234, dport=1234) /
1767               Raw('\xa5' * 100)),
1768              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1769               IPv6(src="3001::1", dst="2002::1") /
1770               UDP(sport=1234, dport=1234) /
1771               Raw('\xa5' * 100))]
1772
1773         # A recursive and a non-recursive route via a next-hop that
1774         # will have a BFD session
1775         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1776                                   [VppRoutePath(self.pg0.remote_ip6,
1777                                                 self.pg0.sw_if_index)])
1778         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1779                                   [VppRoutePath(self.pg0.remote_ip6,
1780                                                 0xffffffff)])
1781         ip_2001_s_64.add_vpp_config()
1782         ip_2002_s_64.add_vpp_config()
1783
1784         # bring the session up now the routes are present
1785         self.vpp_session = VppBFDUDPSession(self,
1786                                             self.pg0,
1787                                             self.pg0.remote_ip6,
1788                                             af=AF_INET6)
1789         self.vpp_session.add_vpp_config()
1790         self.vpp_session.admin_up()
1791         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1792
1793         # session is up - traffic passes
1794         bfd_session_up(self)
1795
1796         self.pg0.add_stream(p)
1797         self.pg_start()
1798         for packet in p:
1799             captured = self.pg0.wait_for_packet(
1800                 1,
1801                 filter_out_fn=self.pkt_is_not_data_traffic)
1802             self.assertEqual(captured[IPv6].dst,
1803                              packet[IPv6].dst)
1804
1805         # session is up - traffic is dropped
1806         bfd_session_down(self)
1807
1808         self.pg0.add_stream(p)
1809         self.pg_start()
1810         with self.assertRaises(CaptureTimeoutError):
1811             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1812
1813         # session is up - traffic passes
1814         bfd_session_up(self)
1815
1816         self.pg0.add_stream(p)
1817         self.pg_start()
1818         for packet in p:
1819             captured = self.pg0.wait_for_packet(
1820                 1,
1821                 filter_out_fn=self.pkt_is_not_data_traffic)
1822             self.assertEqual(captured[IPv6].dst,
1823                              packet[IPv6].dst)
1824
1825
1826 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1827 class BFDTunTestCase(VppTestCase):
1828     """ BFD over GRE tunnel """
1829
1830     vpp_session = None
1831     test_session = None
1832
1833     @classmethod
1834     def setUpClass(cls):
1835         super(BFDTunTestCase, cls).setUpClass()
1836
1837     @classmethod
1838     def tearDownClass(cls):
1839         super(BFDTunTestCase, cls).tearDownClass()
1840
1841     def setUp(self):
1842         super(BFDTunTestCase, self).setUp()
1843         self.create_pg_interfaces(range(1))
1844
1845         self.vapi.want_bfd_events()
1846         self.pg0.enable_capture()
1847
1848         for i in self.pg_interfaces:
1849             i.admin_up()
1850             i.config_ip4()
1851             i.resolve_arp()
1852
1853     def tearDown(self):
1854         if not self.vpp_dead:
1855             self.vapi.want_bfd_events(enable_disable=0)
1856
1857         super(BFDTunTestCase, self).tearDown()
1858
1859     @staticmethod
1860     def pkt_is_not_data_traffic(p):
1861         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1862         if p.haslayer(BFD) or is_ipv6_misc(p):
1863             return True
1864         return False
1865
1866     def test_bfd_o_gre(self):
1867         """ BFD-o-GRE  """
1868
1869         # A GRE interface over which to run a BFD session
1870         gre_if = VppGreInterface(self,
1871                                  self.pg0.local_ip4,
1872                                  self.pg0.remote_ip4)
1873         gre_if.add_vpp_config()
1874         gre_if.admin_up()
1875         gre_if.config_ip4()
1876
1877         # bring the session up now the routes are present
1878         self.vpp_session = VppBFDUDPSession(self,
1879                                             gre_if,
1880                                             gre_if.remote_ip4,
1881                                             is_tunnel=True)
1882         self.vpp_session.add_vpp_config()
1883         self.vpp_session.admin_up()
1884
1885         self.test_session = BFDTestSession(
1886             self, gre_if, AF_INET,
1887             tunnel_header=(IP(src=self.pg0.remote_ip4,
1888                               dst=self.pg0.local_ip4) /
1889                            GRE()),
1890             phy_interface=self.pg0)
1891
1892         # packets to match against both of the routes
1893         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1894               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1895               UDP(sport=1234, dport=1234) /
1896               Raw('\xa5' * 100))]
1897
1898         # session is up - traffic passes
1899         bfd_session_up(self)
1900
1901         self.send_and_expect(self.pg0, p, self.pg0)
1902
1903         # bring session down
1904         bfd_session_down(self)
1905
1906
1907 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1908 class BFDSHA1TestCase(VppTestCase):
1909     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1910
1911     pg0 = None
1912     vpp_clock_offset = None
1913     vpp_session = None
1914     test_session = None
1915
1916     @classmethod
1917     def setUpClass(cls):
1918         super(BFDSHA1TestCase, cls).setUpClass()
1919         cls.vapi.cli("set log class bfd level debug")
1920         try:
1921             cls.create_pg_interfaces([0])
1922             cls.pg0.config_ip4()
1923             cls.pg0.admin_up()
1924             cls.pg0.resolve_arp()
1925
1926         except Exception:
1927             super(BFDSHA1TestCase, cls).tearDownClass()
1928             raise
1929
1930     @classmethod
1931     def tearDownClass(cls):
1932         super(BFDSHA1TestCase, cls).tearDownClass()
1933
1934     def setUp(self):
1935         super(BFDSHA1TestCase, self).setUp()
1936         self.factory = AuthKeyFactory()
1937         self.vapi.want_bfd_events()
1938         self.pg0.enable_capture()
1939
1940     def tearDown(self):
1941         if not self.vpp_dead:
1942             self.vapi.want_bfd_events(enable_disable=0)
1943         self.vapi.collect_events()  # clear the event queue
1944         super(BFDSHA1TestCase, self).tearDown()
1945
1946     def test_session_up(self):
1947         """ bring BFD session up """
1948         key = self.factory.create_random_key(self)
1949         key.add_vpp_config()
1950         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1951                                             self.pg0.remote_ip4,
1952                                             sha1_key=key)
1953         self.vpp_session.add_vpp_config()
1954         self.vpp_session.admin_up()
1955         self.test_session = BFDTestSession(
1956             self, self.pg0, AF_INET, sha1_key=key,
1957             bfd_key_id=self.vpp_session.bfd_key_id)
1958         bfd_session_up(self)
1959
1960     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1961     def test_hold_up(self):
1962         """ hold BFD session up """
1963         key = self.factory.create_random_key(self)
1964         key.add_vpp_config()
1965         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1966                                             self.pg0.remote_ip4,
1967                                             sha1_key=key)
1968         self.vpp_session.add_vpp_config()
1969         self.vpp_session.admin_up()
1970         self.test_session = BFDTestSession(
1971             self, self.pg0, AF_INET, sha1_key=key,
1972             bfd_key_id=self.vpp_session.bfd_key_id)
1973         bfd_session_up(self)
1974         for dummy in range(self.test_session.detect_mult * 2):
1975             wait_for_bfd_packet(self)
1976             self.test_session.send_packet()
1977         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1978
1979     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1980     def test_hold_up_meticulous(self):
1981         """ hold BFD session up - meticulous auth """
1982         key = self.factory.create_random_key(
1983             self, BFDAuthType.meticulous_keyed_sha1)
1984         key.add_vpp_config()
1985         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1986                                             self.pg0.remote_ip4, sha1_key=key)
1987         self.vpp_session.add_vpp_config()
1988         self.vpp_session.admin_up()
1989         # specify sequence number so that it wraps
1990         self.test_session = BFDTestSession(
1991             self, self.pg0, AF_INET, sha1_key=key,
1992             bfd_key_id=self.vpp_session.bfd_key_id,
1993             our_seq_number=0xFFFFFFFF - 4)
1994         bfd_session_up(self)
1995         for dummy in range(30):
1996             wait_for_bfd_packet(self)
1997             self.test_session.inc_seq_num()
1998             self.test_session.send_packet()
1999         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2000
2001     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2002     def test_send_bad_seq_number(self):
2003         """ session is not kept alive by msgs with bad sequence numbers"""
2004         key = self.factory.create_random_key(
2005             self, BFDAuthType.meticulous_keyed_sha1)
2006         key.add_vpp_config()
2007         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2008                                             self.pg0.remote_ip4, sha1_key=key)
2009         self.vpp_session.add_vpp_config()
2010         self.test_session = BFDTestSession(
2011             self, self.pg0, AF_INET, sha1_key=key,
2012             bfd_key_id=self.vpp_session.bfd_key_id)
2013         bfd_session_up(self)
2014         detection_time = self.test_session.detect_mult *\
2015             self.vpp_session.required_min_rx / USEC_IN_SEC
2016         send_until = time.time() + 2 * detection_time
2017         while time.time() < send_until:
2018             self.test_session.send_packet()
2019             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2020                        "time between bfd packets")
2021         e = self.vapi.collect_events()
2022         # session should be down now, because the sequence numbers weren't
2023         # updated
2024         self.assert_equal(len(e), 1, "number of bfd events")
2025         verify_event(self, e[0], expected_state=BFDState.down)
2026
2027     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2028                                        legitimate_test_session,
2029                                        rogue_test_session,
2030                                        rogue_bfd_values=None):
2031         """ execute a rogue session interaction scenario
2032
2033         1. create vpp session, add config
2034         2. bring the legitimate session up
2035         3. copy the bfd values from legitimate session to rogue session
2036         4. apply rogue_bfd_values to rogue session
2037         5. set rogue session state to down
2038         6. send message to take the session down from the rogue session
2039         7. assert that the legitimate session is unaffected
2040         """
2041
2042         self.vpp_session = vpp_bfd_udp_session
2043         self.vpp_session.add_vpp_config()
2044         self.test_session = legitimate_test_session
2045         # bring vpp session up
2046         bfd_session_up(self)
2047         # send packet from rogue session
2048         rogue_test_session.update(
2049             my_discriminator=self.test_session.my_discriminator,
2050             your_discriminator=self.test_session.your_discriminator,
2051             desired_min_tx=self.test_session.desired_min_tx,
2052             required_min_rx=self.test_session.required_min_rx,
2053             detect_mult=self.test_session.detect_mult,
2054             diag=self.test_session.diag,
2055             state=self.test_session.state,
2056             auth_type=self.test_session.auth_type)
2057         if rogue_bfd_values:
2058             rogue_test_session.update(**rogue_bfd_values)
2059         rogue_test_session.update(state=BFDState.down)
2060         rogue_test_session.send_packet()
2061         wait_for_bfd_packet(self)
2062         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2063
2064     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2065     def test_mismatch_auth(self):
2066         """ session is not brought down by unauthenticated msg """
2067         key = self.factory.create_random_key(self)
2068         key.add_vpp_config()
2069         vpp_session = VppBFDUDPSession(
2070             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2071         legitimate_test_session = BFDTestSession(
2072             self, self.pg0, AF_INET, sha1_key=key,
2073             bfd_key_id=vpp_session.bfd_key_id)
2074         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2075         self.execute_rogue_session_scenario(vpp_session,
2076                                             legitimate_test_session,
2077                                             rogue_test_session)
2078
2079     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2080     def test_mismatch_bfd_key_id(self):
2081         """ session is not brought down by msg with non-existent key-id """
2082         key = self.factory.create_random_key(self)
2083         key.add_vpp_config()
2084         vpp_session = VppBFDUDPSession(
2085             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2086         # pick a different random bfd key id
2087         x = randint(0, 255)
2088         while x == vpp_session.bfd_key_id:
2089             x = randint(0, 255)
2090         legitimate_test_session = BFDTestSession(
2091             self, self.pg0, AF_INET, sha1_key=key,
2092             bfd_key_id=vpp_session.bfd_key_id)
2093         rogue_test_session = BFDTestSession(
2094             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2095         self.execute_rogue_session_scenario(vpp_session,
2096                                             legitimate_test_session,
2097                                             rogue_test_session)
2098
2099     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2100     def test_mismatched_auth_type(self):
2101         """ session is not brought down by msg with wrong auth type """
2102         key = self.factory.create_random_key(self)
2103         key.add_vpp_config()
2104         vpp_session = VppBFDUDPSession(
2105             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2106         legitimate_test_session = BFDTestSession(
2107             self, self.pg0, AF_INET, sha1_key=key,
2108             bfd_key_id=vpp_session.bfd_key_id)
2109         rogue_test_session = BFDTestSession(
2110             self, self.pg0, AF_INET, sha1_key=key,
2111             bfd_key_id=vpp_session.bfd_key_id)
2112         self.execute_rogue_session_scenario(
2113             vpp_session, legitimate_test_session, rogue_test_session,
2114             {'auth_type': BFDAuthType.keyed_md5})
2115
2116     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2117     def test_restart(self):
2118         """ simulate remote peer restart and resynchronization """
2119         key = self.factory.create_random_key(
2120             self, BFDAuthType.meticulous_keyed_sha1)
2121         key.add_vpp_config()
2122         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2123                                             self.pg0.remote_ip4, sha1_key=key)
2124         self.vpp_session.add_vpp_config()
2125         self.test_session = BFDTestSession(
2126             self, self.pg0, AF_INET, sha1_key=key,
2127             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2128         bfd_session_up(self)
2129         # don't send any packets for 2*detection_time
2130         detection_time = self.test_session.detect_mult *\
2131             self.vpp_session.required_min_rx / USEC_IN_SEC
2132         self.sleep(2 * detection_time, "simulating peer restart")
2133         events = self.vapi.collect_events()
2134         self.assert_equal(len(events), 1, "number of bfd events")
2135         verify_event(self, events[0], expected_state=BFDState.down)
2136         self.test_session.update(state=BFDState.down)
2137         # reset sequence number
2138         self.test_session.our_seq_number = 0
2139         self.test_session.vpp_seq_number = None
2140         # now throw away any pending packets
2141         self.pg0.enable_capture()
2142         self.test_session.my_discriminator = 0
2143         bfd_session_up(self)
2144
2145
2146 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2147 class BFDAuthOnOffTestCase(VppTestCase):
2148     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2149
2150     pg0 = None
2151     vpp_session = None
2152     test_session = None
2153
2154     @classmethod
2155     def setUpClass(cls):
2156         super(BFDAuthOnOffTestCase, cls).setUpClass()
2157         cls.vapi.cli("set log class bfd level debug")
2158         try:
2159             cls.create_pg_interfaces([0])
2160             cls.pg0.config_ip4()
2161             cls.pg0.admin_up()
2162             cls.pg0.resolve_arp()
2163
2164         except Exception:
2165             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2166             raise
2167
2168     @classmethod
2169     def tearDownClass(cls):
2170         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2171
2172     def setUp(self):
2173         super(BFDAuthOnOffTestCase, self).setUp()
2174         self.factory = AuthKeyFactory()
2175         self.vapi.want_bfd_events()
2176         self.pg0.enable_capture()
2177
2178     def tearDown(self):
2179         if not self.vpp_dead:
2180             self.vapi.want_bfd_events(enable_disable=0)
2181         self.vapi.collect_events()  # clear the event queue
2182         super(BFDAuthOnOffTestCase, self).tearDown()
2183
2184     def test_auth_on_immediate(self):
2185         """ turn auth on without disturbing session state (immediate) """
2186         key = self.factory.create_random_key(self)
2187         key.add_vpp_config()
2188         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2189                                             self.pg0.remote_ip4)
2190         self.vpp_session.add_vpp_config()
2191         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2192         bfd_session_up(self)
2193         for dummy in range(self.test_session.detect_mult * 2):
2194             p = wait_for_bfd_packet(self)
2195             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2196             self.test_session.send_packet()
2197         self.vpp_session.activate_auth(key)
2198         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2199         self.test_session.sha1_key = key
2200         for dummy in range(self.test_session.detect_mult * 2):
2201             p = wait_for_bfd_packet(self)
2202             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2203             self.test_session.send_packet()
2204         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2205         self.assert_equal(len(self.vapi.collect_events()), 0,
2206                           "number of bfd events")
2207
2208     def test_auth_off_immediate(self):
2209         """ turn auth off without disturbing session state (immediate) """
2210         key = self.factory.create_random_key(self)
2211         key.add_vpp_config()
2212         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2213                                             self.pg0.remote_ip4, sha1_key=key)
2214         self.vpp_session.add_vpp_config()
2215         self.test_session = BFDTestSession(
2216             self, self.pg0, AF_INET, sha1_key=key,
2217             bfd_key_id=self.vpp_session.bfd_key_id)
2218         bfd_session_up(self)
2219         # self.vapi.want_bfd_events(enable_disable=0)
2220         for dummy in range(self.test_session.detect_mult * 2):
2221             p = wait_for_bfd_packet(self)
2222             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2223             self.test_session.inc_seq_num()
2224             self.test_session.send_packet()
2225         self.vpp_session.deactivate_auth()
2226         self.test_session.bfd_key_id = None
2227         self.test_session.sha1_key = None
2228         for dummy in range(self.test_session.detect_mult * 2):
2229             p = wait_for_bfd_packet(self)
2230             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2231             self.test_session.inc_seq_num()
2232             self.test_session.send_packet()
2233         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2234         self.assert_equal(len(self.vapi.collect_events()), 0,
2235                           "number of bfd events")
2236
2237     def test_auth_change_key_immediate(self):
2238         """ change auth key without disturbing session state (immediate) """
2239         key1 = self.factory.create_random_key(self)
2240         key1.add_vpp_config()
2241         key2 = self.factory.create_random_key(self)
2242         key2.add_vpp_config()
2243         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2244                                             self.pg0.remote_ip4, sha1_key=key1)
2245         self.vpp_session.add_vpp_config()
2246         self.test_session = BFDTestSession(
2247             self, self.pg0, AF_INET, sha1_key=key1,
2248             bfd_key_id=self.vpp_session.bfd_key_id)
2249         bfd_session_up(self)
2250         for dummy in range(self.test_session.detect_mult * 2):
2251             p = wait_for_bfd_packet(self)
2252             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2253             self.test_session.send_packet()
2254         self.vpp_session.activate_auth(key2)
2255         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2256         self.test_session.sha1_key = key2
2257         for dummy in range(self.test_session.detect_mult * 2):
2258             p = wait_for_bfd_packet(self)
2259             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2260             self.test_session.send_packet()
2261         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2262         self.assert_equal(len(self.vapi.collect_events()), 0,
2263                           "number of bfd events")
2264
2265     def test_auth_on_delayed(self):
2266         """ turn auth on without disturbing session state (delayed) """
2267         key = self.factory.create_random_key(self)
2268         key.add_vpp_config()
2269         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2270                                             self.pg0.remote_ip4)
2271         self.vpp_session.add_vpp_config()
2272         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2273         bfd_session_up(self)
2274         for dummy in range(self.test_session.detect_mult * 2):
2275             wait_for_bfd_packet(self)
2276             self.test_session.send_packet()
2277         self.vpp_session.activate_auth(key, delayed=True)
2278         for dummy in range(self.test_session.detect_mult * 2):
2279             p = wait_for_bfd_packet(self)
2280             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2281             self.test_session.send_packet()
2282         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2283         self.test_session.sha1_key = key
2284         self.test_session.send_packet()
2285         for dummy in range(self.test_session.detect_mult * 2):
2286             p = wait_for_bfd_packet(self)
2287             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2288             self.test_session.send_packet()
2289         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2290         self.assert_equal(len(self.vapi.collect_events()), 0,
2291                           "number of bfd events")
2292
2293     def test_auth_off_delayed(self):
2294         """ turn auth off without disturbing session state (delayed) """
2295         key = self.factory.create_random_key(self)
2296         key.add_vpp_config()
2297         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2298                                             self.pg0.remote_ip4, sha1_key=key)
2299         self.vpp_session.add_vpp_config()
2300         self.test_session = BFDTestSession(
2301             self, self.pg0, AF_INET, sha1_key=key,
2302             bfd_key_id=self.vpp_session.bfd_key_id)
2303         bfd_session_up(self)
2304         for dummy in range(self.test_session.detect_mult * 2):
2305             p = wait_for_bfd_packet(self)
2306             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2307             self.test_session.send_packet()
2308         self.vpp_session.deactivate_auth(delayed=True)
2309         for dummy in range(self.test_session.detect_mult * 2):
2310             p = wait_for_bfd_packet(self)
2311             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2312             self.test_session.send_packet()
2313         self.test_session.bfd_key_id = None
2314         self.test_session.sha1_key = None
2315         self.test_session.send_packet()
2316         for dummy in range(self.test_session.detect_mult * 2):
2317             p = wait_for_bfd_packet(self)
2318             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2319             self.test_session.send_packet()
2320         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2321         self.assert_equal(len(self.vapi.collect_events()), 0,
2322                           "number of bfd events")
2323
2324     def test_auth_change_key_delayed(self):
2325         """ change auth key without disturbing session state (delayed) """
2326         key1 = self.factory.create_random_key(self)
2327         key1.add_vpp_config()
2328         key2 = self.factory.create_random_key(self)
2329         key2.add_vpp_config()
2330         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2331                                             self.pg0.remote_ip4, sha1_key=key1)
2332         self.vpp_session.add_vpp_config()
2333         self.vpp_session.admin_up()
2334         self.test_session = BFDTestSession(
2335             self, self.pg0, AF_INET, sha1_key=key1,
2336             bfd_key_id=self.vpp_session.bfd_key_id)
2337         bfd_session_up(self)
2338         for dummy in range(self.test_session.detect_mult * 2):
2339             p = wait_for_bfd_packet(self)
2340             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2341             self.test_session.send_packet()
2342         self.vpp_session.activate_auth(key2, delayed=True)
2343         for dummy in range(self.test_session.detect_mult * 2):
2344             p = wait_for_bfd_packet(self)
2345             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2346             self.test_session.send_packet()
2347         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2348         self.test_session.sha1_key = key2
2349         self.test_session.send_packet()
2350         for dummy in range(self.test_session.detect_mult * 2):
2351             p = wait_for_bfd_packet(self)
2352             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2353             self.test_session.send_packet()
2354         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2355         self.assert_equal(len(self.vapi.collect_events()), 0,
2356                           "number of bfd events")
2357
2358
2359 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2360 class BFDCLITestCase(VppTestCase):
2361     """Bidirectional Forwarding Detection (BFD) (CLI) """
2362     pg0 = None
2363
2364     @classmethod
2365     def setUpClass(cls):
2366         super(BFDCLITestCase, cls).setUpClass()
2367         cls.vapi.cli("set log class bfd level debug")
2368         try:
2369             cls.create_pg_interfaces((0,))
2370             cls.pg0.config_ip4()
2371             cls.pg0.config_ip6()
2372             cls.pg0.resolve_arp()
2373             cls.pg0.resolve_ndp()
2374
2375         except Exception:
2376             super(BFDCLITestCase, cls).tearDownClass()
2377             raise
2378
2379     @classmethod
2380     def tearDownClass(cls):
2381         super(BFDCLITestCase, cls).tearDownClass()
2382
2383     def setUp(self):
2384         super(BFDCLITestCase, self).setUp()
2385         self.factory = AuthKeyFactory()
2386         self.pg0.enable_capture()
2387
2388     def tearDown(self):
2389         try:
2390             self.vapi.want_bfd_events(enable_disable=0)
2391         except UnexpectedApiReturnValueError:
2392             # some tests aren't subscribed, so this is not an issue
2393             pass
2394         self.vapi.collect_events()  # clear the event queue
2395         super(BFDCLITestCase, self).tearDown()
2396
2397     def cli_verify_no_response(self, cli):
2398         """ execute a CLI, asserting that the response is empty """
2399         self.assert_equal(self.vapi.cli(cli),
2400                           b"",
2401                           "CLI command response")
2402
2403     def cli_verify_response(self, cli, expected):
2404         """ execute a CLI, asserting that the response matches expectation """
2405         self.assert_equal(self.vapi.cli(cli).strip(),
2406                           expected,
2407                           "CLI command response")
2408
2409     def test_show(self):
2410         """ show commands """
2411         k1 = self.factory.create_random_key(self)
2412         k1.add_vpp_config()
2413         k2 = self.factory.create_random_key(
2414             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2415         k2.add_vpp_config()
2416         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2417         s1.add_vpp_config()
2418         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2419                               sha1_key=k2)
2420         s2.add_vpp_config()
2421         self.logger.info(self.vapi.ppcli("show bfd keys"))
2422         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2423         self.logger.info(self.vapi.ppcli("show bfd"))
2424
2425     def test_set_del_sha1_key(self):
2426         """ set/delete SHA1 auth key """
2427         k = self.factory.create_random_key(self)
2428         self.registry.register(k, self.logger)
2429         self.cli_verify_no_response(
2430             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2431             (k.conf_key_id,
2432                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2433         self.assertTrue(k.query_vpp_config())
2434         self.vpp_session = VppBFDUDPSession(
2435             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2436         self.vpp_session.add_vpp_config()
2437         self.test_session = \
2438             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2439                            bfd_key_id=self.vpp_session.bfd_key_id)
2440         self.vapi.want_bfd_events()
2441         bfd_session_up(self)
2442         bfd_session_down(self)
2443         # try to replace the secret for the key - should fail because the key
2444         # is in-use
2445         k2 = self.factory.create_random_key(self)
2446         self.cli_verify_response(
2447             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2448             (k.conf_key_id,
2449                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2450             "bfd key set: `bfd_auth_set_key' API call failed, "
2451             "rv=-103:BFD object in use")
2452         # manipulating the session using old secret should still work
2453         bfd_session_up(self)
2454         bfd_session_down(self)
2455         self.vpp_session.remove_vpp_config()
2456         self.cli_verify_no_response(
2457             "bfd key del conf-key-id %s" % k.conf_key_id)
2458         self.assertFalse(k.query_vpp_config())
2459
2460     def test_set_del_meticulous_sha1_key(self):
2461         """ set/delete meticulous SHA1 auth key """
2462         k = self.factory.create_random_key(
2463             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2464         self.registry.register(k, self.logger)
2465         self.cli_verify_no_response(
2466             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2467             (k.conf_key_id,
2468                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2469         self.assertTrue(k.query_vpp_config())
2470         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2471                                             self.pg0.remote_ip6, af=AF_INET6,
2472                                             sha1_key=k)
2473         self.vpp_session.add_vpp_config()
2474         self.vpp_session.admin_up()
2475         self.test_session = \
2476             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2477                            bfd_key_id=self.vpp_session.bfd_key_id)
2478         self.vapi.want_bfd_events()
2479         bfd_session_up(self)
2480         bfd_session_down(self)
2481         # try to replace the secret for the key - should fail because the key
2482         # is in-use
2483         k2 = self.factory.create_random_key(self)
2484         self.cli_verify_response(
2485             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2486             (k.conf_key_id,
2487                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2488             "bfd key set: `bfd_auth_set_key' API call failed, "
2489             "rv=-103:BFD object in use")
2490         # manipulating the session using old secret should still work
2491         bfd_session_up(self)
2492         bfd_session_down(self)
2493         self.vpp_session.remove_vpp_config()
2494         self.cli_verify_no_response(
2495             "bfd key del conf-key-id %s" % k.conf_key_id)
2496         self.assertFalse(k.query_vpp_config())
2497
2498     def test_add_mod_del_bfd_udp(self):
2499         """ create/modify/delete IPv4 BFD UDP session """
2500         vpp_session = VppBFDUDPSession(
2501             self, self.pg0, self.pg0.remote_ip4)
2502         self.registry.register(vpp_session, self.logger)
2503         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2504             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2505             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2506                                 self.pg0.remote_ip4,
2507                                 vpp_session.desired_min_tx,
2508                                 vpp_session.required_min_rx,
2509                                 vpp_session.detect_mult)
2510         self.cli_verify_no_response(cli_add_cmd)
2511         # 2nd add should fail
2512         self.cli_verify_response(
2513             cli_add_cmd,
2514             "bfd udp session add: `bfd_add_add_session' API call"
2515             " failed, rv=-101:Duplicate BFD object")
2516         verify_bfd_session_config(self, vpp_session)
2517         mod_session = VppBFDUDPSession(
2518             self, self.pg0, self.pg0.remote_ip4,
2519             required_min_rx=2 * vpp_session.required_min_rx,
2520             desired_min_tx=3 * vpp_session.desired_min_tx,
2521             detect_mult=4 * vpp_session.detect_mult)
2522         self.cli_verify_no_response(
2523             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2524             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2525             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2526              mod_session.desired_min_tx, mod_session.required_min_rx,
2527              mod_session.detect_mult))
2528         verify_bfd_session_config(self, mod_session)
2529         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2530             "peer-addr %s" % (self.pg0.name,
2531                               self.pg0.local_ip4, self.pg0.remote_ip4)
2532         self.cli_verify_no_response(cli_del_cmd)
2533         # 2nd del is expected to fail
2534         self.cli_verify_response(
2535             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2536             " failed, rv=-102:No such BFD object")
2537         self.assertFalse(vpp_session.query_vpp_config())
2538
2539     def test_add_mod_del_bfd_udp6(self):
2540         """ create/modify/delete IPv6 BFD UDP session """
2541         vpp_session = VppBFDUDPSession(
2542             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2543         self.registry.register(vpp_session, self.logger)
2544         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2545             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2546             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2547                                 self.pg0.remote_ip6,
2548                                 vpp_session.desired_min_tx,
2549                                 vpp_session.required_min_rx,
2550                                 vpp_session.detect_mult)
2551         self.cli_verify_no_response(cli_add_cmd)
2552         # 2nd add should fail
2553         self.cli_verify_response(
2554             cli_add_cmd,
2555             "bfd udp session add: `bfd_add_add_session' API call"
2556             " failed, rv=-101:Duplicate BFD object")
2557         verify_bfd_session_config(self, vpp_session)
2558         mod_session = VppBFDUDPSession(
2559             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2560             required_min_rx=2 * vpp_session.required_min_rx,
2561             desired_min_tx=3 * vpp_session.desired_min_tx,
2562             detect_mult=4 * vpp_session.detect_mult)
2563         self.cli_verify_no_response(
2564             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2565             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2566             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2567              mod_session.desired_min_tx,
2568              mod_session.required_min_rx, mod_session.detect_mult))
2569         verify_bfd_session_config(self, mod_session)
2570         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2571             "peer-addr %s" % (self.pg0.name,
2572                               self.pg0.local_ip6, self.pg0.remote_ip6)
2573         self.cli_verify_no_response(cli_del_cmd)
2574         # 2nd del is expected to fail
2575         self.cli_verify_response(
2576             cli_del_cmd,
2577             "bfd udp session del: `bfd_udp_del_session' API call"
2578             " failed, rv=-102:No such BFD object")
2579         self.assertFalse(vpp_session.query_vpp_config())
2580
2581     def test_add_mod_del_bfd_udp_auth(self):
2582         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2583         key = self.factory.create_random_key(self)
2584         key.add_vpp_config()
2585         vpp_session = VppBFDUDPSession(
2586             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2587         self.registry.register(vpp_session, self.logger)
2588         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2589             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2590             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2591             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2592                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2593                vpp_session.detect_mult, key.conf_key_id,
2594                vpp_session.bfd_key_id)
2595         self.cli_verify_no_response(cli_add_cmd)
2596         # 2nd add should fail
2597         self.cli_verify_response(
2598             cli_add_cmd,
2599             "bfd udp session add: `bfd_add_add_session' API call"
2600             " failed, rv=-101:Duplicate BFD object")
2601         verify_bfd_session_config(self, vpp_session)
2602         mod_session = VppBFDUDPSession(
2603             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2604             bfd_key_id=vpp_session.bfd_key_id,
2605             required_min_rx=2 * vpp_session.required_min_rx,
2606             desired_min_tx=3 * vpp_session.desired_min_tx,
2607             detect_mult=4 * vpp_session.detect_mult)
2608         self.cli_verify_no_response(
2609             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2610             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2611             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2612              mod_session.desired_min_tx,
2613              mod_session.required_min_rx, mod_session.detect_mult))
2614         verify_bfd_session_config(self, mod_session)
2615         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2616             "peer-addr %s" % (self.pg0.name,
2617                               self.pg0.local_ip4, self.pg0.remote_ip4)
2618         self.cli_verify_no_response(cli_del_cmd)
2619         # 2nd del is expected to fail
2620         self.cli_verify_response(
2621             cli_del_cmd,
2622             "bfd udp session del: `bfd_udp_del_session' API call"
2623             " failed, rv=-102:No such BFD object")
2624         self.assertFalse(vpp_session.query_vpp_config())
2625
2626     def test_add_mod_del_bfd_udp6_auth(self):
2627         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2628         key = self.factory.create_random_key(
2629             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2630         key.add_vpp_config()
2631         vpp_session = VppBFDUDPSession(
2632             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2633         self.registry.register(vpp_session, self.logger)
2634         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2635             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2636             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2637             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2638                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2639                vpp_session.detect_mult, key.conf_key_id,
2640                vpp_session.bfd_key_id)
2641         self.cli_verify_no_response(cli_add_cmd)
2642         # 2nd add should fail
2643         self.cli_verify_response(
2644             cli_add_cmd,
2645             "bfd udp session add: `bfd_add_add_session' API call"
2646             " failed, rv=-101:Duplicate BFD object")
2647         verify_bfd_session_config(self, vpp_session)
2648         mod_session = VppBFDUDPSession(
2649             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2650             bfd_key_id=vpp_session.bfd_key_id,
2651             required_min_rx=2 * vpp_session.required_min_rx,
2652             desired_min_tx=3 * vpp_session.desired_min_tx,
2653             detect_mult=4 * vpp_session.detect_mult)
2654         self.cli_verify_no_response(
2655             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2656             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2657             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2658              mod_session.desired_min_tx,
2659              mod_session.required_min_rx, mod_session.detect_mult))
2660         verify_bfd_session_config(self, mod_session)
2661         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2662             "peer-addr %s" % (self.pg0.name,
2663                               self.pg0.local_ip6, self.pg0.remote_ip6)
2664         self.cli_verify_no_response(cli_del_cmd)
2665         # 2nd del is expected to fail
2666         self.cli_verify_response(
2667             cli_del_cmd,
2668             "bfd udp session del: `bfd_udp_del_session' API call"
2669             " failed, rv=-102:No such BFD object")
2670         self.assertFalse(vpp_session.query_vpp_config())
2671
2672     def test_auth_on_off(self):
2673         """ turn authentication on and off """
2674         key = self.factory.create_random_key(
2675             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2676         key.add_vpp_config()
2677         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2678         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2679                                         sha1_key=key)
2680         session.add_vpp_config()
2681         cli_activate = \
2682             "bfd udp session auth activate interface %s local-addr %s "\
2683             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2684             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2685                key.conf_key_id, auth_session.bfd_key_id)
2686         self.cli_verify_no_response(cli_activate)
2687         verify_bfd_session_config(self, auth_session)
2688         self.cli_verify_no_response(cli_activate)
2689         verify_bfd_session_config(self, auth_session)
2690         cli_deactivate = \
2691             "bfd udp session auth deactivate interface %s local-addr %s "\
2692             "peer-addr %s "\
2693             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2694         self.cli_verify_no_response(cli_deactivate)
2695         verify_bfd_session_config(self, session)
2696         self.cli_verify_no_response(cli_deactivate)
2697         verify_bfd_session_config(self, session)
2698
2699     def test_auth_on_off_delayed(self):
2700         """ turn authentication on and off (delayed) """
2701         key = self.factory.create_random_key(
2702             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2703         key.add_vpp_config()
2704         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2705         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2706                                         sha1_key=key)
2707         session.add_vpp_config()
2708         cli_activate = \
2709             "bfd udp session auth activate interface %s local-addr %s "\
2710             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2711             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2712                key.conf_key_id, auth_session.bfd_key_id)
2713         self.cli_verify_no_response(cli_activate)
2714         verify_bfd_session_config(self, auth_session)
2715         self.cli_verify_no_response(cli_activate)
2716         verify_bfd_session_config(self, auth_session)
2717         cli_deactivate = \
2718             "bfd udp session auth deactivate interface %s local-addr %s "\
2719             "peer-addr %s delayed yes"\
2720             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2721         self.cli_verify_no_response(cli_deactivate)
2722         verify_bfd_session_config(self, session)
2723         self.cli_verify_no_response(cli_deactivate)
2724         verify_bfd_session_config(self, session)
2725
2726     def test_admin_up_down(self):
2727         """ put session admin-up and admin-down """
2728         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2729         session.add_vpp_config()
2730         cli_down = \
2731             "bfd udp session set-flags admin down interface %s local-addr %s "\
2732             "peer-addr %s "\
2733             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2734         cli_up = \
2735             "bfd udp session set-flags admin up interface %s local-addr %s "\
2736             "peer-addr %s "\
2737             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2738         self.cli_verify_no_response(cli_down)
2739         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2740         self.cli_verify_no_response(cli_up)
2741         verify_bfd_session_config(self, session, state=BFDState.down)
2742
2743     def test_set_del_udp_echo_source(self):
2744         """ set/del udp echo source """
2745         self.create_loopback_interfaces(1)
2746         self.loopback0 = self.lo_interfaces[0]
2747         self.loopback0.admin_up()
2748         self.cli_verify_response("show bfd echo-source",
2749                                  "UDP echo source is not set.")
2750         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2751         self.cli_verify_no_response(cli_set)
2752         self.cli_verify_response("show bfd echo-source",
2753                                  "UDP echo source is: %s\n"
2754                                  "IPv4 address usable as echo source: none\n"
2755                                  "IPv6 address usable as echo source: none" %
2756                                  self.loopback0.name)
2757         self.loopback0.config_ip4()
2758         unpacked = unpack("!L", self.loopback0.local_ip4n)
2759         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2760         self.cli_verify_response("show bfd echo-source",
2761                                  "UDP echo source is: %s\n"
2762                                  "IPv4 address usable as echo source: %s\n"
2763                                  "IPv6 address usable as echo source: none" %
2764                                  (self.loopback0.name, echo_ip4))
2765         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2766         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2767                                             unpacked[2], unpacked[3] ^ 1))
2768         self.loopback0.config_ip6()
2769         self.cli_verify_response("show bfd echo-source",
2770                                  "UDP echo source is: %s\n"
2771                                  "IPv4 address usable as echo source: %s\n"
2772                                  "IPv6 address usable as echo source: %s" %
2773                                  (self.loopback0.name, echo_ip4, echo_ip6))
2774         cli_del = "bfd udp echo-source del"
2775         self.cli_verify_no_response(cli_del)
2776         self.cli_verify_response("show bfd echo-source",
2777                                  "UDP echo source is not set.")
2778
2779 if __name__ == '__main__':
2780     unittest.main(testRunner=VppTestRunner)