fib: fib api updates
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5
6 import binascii
7 import hashlib
8 import time
9 import unittest
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from struct import pack, unpack
13
14 from six import moves
15 import scapy.compat
16 from scapy.layers.inet import UDP, IP
17 from scapy.layers.inet6 import IPv6
18 from scapy.layers.l2 import Ether, GRE
19 from scapy.packet import Raw
20
21 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
22     BFDDiagCode, BFDState, BFD_vpp_echo
23 from framework import VppTestCase, VppTestRunner, running_extended_tests
24 from util import ppp
25 from vpp_ip import DpoProto
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_lo_interface import VppLoInterface
28 from vpp_papi_provider import UnexpectedApiReturnValueError
29 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
30 from vpp_gre_interface import VppGreInterface
31
32 USEC_IN_SEC = 1000000
33
34
35 class AuthKeyFactory(object):
36     """Factory class for creating auth keys with unique conf key ID"""
37
38     def __init__(self):
39         self._conf_key_ids = {}
40
41     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
42         """ create a random key with unique conf key id """
43         conf_key_id = randint(0, 0xFFFFFFFF)
44         while conf_key_id in self._conf_key_ids:
45             conf_key_id = randint(0, 0xFFFFFFFF)
46         self._conf_key_ids[conf_key_id] = 1
47         key = scapy.compat.raw(
48             bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
49         return VppBFDAuthKey(test=test, auth_type=auth_type,
50                              conf_key_id=conf_key_id, key=key)
51
52
53 @unittest.skipUnless(running_extended_tests, "part of extended tests")
54 class BFDAPITestCase(VppTestCase):
55     """Bidirectional Forwarding Detection (BFD) - API"""
56
57     pg0 = None
58     pg1 = None
59
60     @classmethod
61     def setUpClass(cls):
62         super(BFDAPITestCase, cls).setUpClass()
63         cls.vapi.cli("set log class bfd level debug")
64         try:
65             cls.create_pg_interfaces(range(2))
66             for i in cls.pg_interfaces:
67                 i.config_ip4()
68                 i.config_ip6()
69                 i.resolve_arp()
70
71         except Exception:
72             super(BFDAPITestCase, cls).tearDownClass()
73             raise
74
75     @classmethod
76     def tearDownClass(cls):
77         super(BFDAPITestCase, cls).tearDownClass()
78
79     def setUp(self):
80         super(BFDAPITestCase, self).setUp()
81         self.factory = AuthKeyFactory()
82
83     def test_add_bfd(self):
84         """ create a BFD session """
85         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
86         session.add_vpp_config()
87         self.logger.debug("Session state is %s", session.state)
88         session.remove_vpp_config()
89         session.add_vpp_config()
90         self.logger.debug("Session state is %s", session.state)
91         session.remove_vpp_config()
92
93     def test_double_add(self):
94         """ create the same BFD session twice (negative case) """
95         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
96         session.add_vpp_config()
97
98         with self.vapi.assert_negative_api_retval():
99             session.add_vpp_config()
100
101         session.remove_vpp_config()
102
103     def test_add_bfd6(self):
104         """ create IPv6 BFD session """
105         session = VppBFDUDPSession(
106             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
107         session.add_vpp_config()
108         self.logger.debug("Session state is %s", session.state)
109         session.remove_vpp_config()
110         session.add_vpp_config()
111         self.logger.debug("Session state is %s", session.state)
112         session.remove_vpp_config()
113
114     def test_mod_bfd(self):
115         """ modify BFD session parameters """
116         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
117                                    desired_min_tx=50000,
118                                    required_min_rx=10000,
119                                    detect_mult=1)
120         session.add_vpp_config()
121         s = session.get_bfd_udp_session_dump_entry()
122         self.assert_equal(session.desired_min_tx,
123                           s.desired_min_tx,
124                           "desired min transmit interval")
125         self.assert_equal(session.required_min_rx,
126                           s.required_min_rx,
127                           "required min receive interval")
128         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
129         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
130                                   required_min_rx=session.required_min_rx * 2,
131                                   detect_mult=session.detect_mult * 2)
132         s = session.get_bfd_udp_session_dump_entry()
133         self.assert_equal(session.desired_min_tx,
134                           s.desired_min_tx,
135                           "desired min transmit interval")
136         self.assert_equal(session.required_min_rx,
137                           s.required_min_rx,
138                           "required min receive interval")
139         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
140
141     def test_add_sha1_keys(self):
142         """ add SHA1 keys """
143         key_count = 10
144         keys = [self.factory.create_random_key(
145             self) for i in range(0, key_count)]
146         for key in keys:
147             self.assertFalse(key.query_vpp_config())
148         for key in keys:
149             key.add_vpp_config()
150         for key in keys:
151             self.assertTrue(key.query_vpp_config())
152         # remove randomly
153         indexes = range(key_count)
154         shuffle(indexes)
155         removed = []
156         for i in indexes:
157             key = keys[i]
158             key.remove_vpp_config()
159             removed.append(i)
160             for j in range(key_count):
161                 key = keys[j]
162                 if j in removed:
163                     self.assertFalse(key.query_vpp_config())
164                 else:
165                     self.assertTrue(key.query_vpp_config())
166         # should be removed now
167         for key in keys:
168             self.assertFalse(key.query_vpp_config())
169         # add back and remove again
170         for key in keys:
171             key.add_vpp_config()
172         for key in keys:
173             self.assertTrue(key.query_vpp_config())
174         for key in keys:
175             key.remove_vpp_config()
176         for key in keys:
177             self.assertFalse(key.query_vpp_config())
178
179     def test_add_bfd_sha1(self):
180         """ create a BFD session (SHA1) """
181         key = self.factory.create_random_key(self)
182         key.add_vpp_config()
183         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184                                    sha1_key=key)
185         session.add_vpp_config()
186         self.logger.debug("Session state is %s", session.state)
187         session.remove_vpp_config()
188         session.add_vpp_config()
189         self.logger.debug("Session state is %s", session.state)
190         session.remove_vpp_config()
191
192     def test_double_add_sha1(self):
193         """ create the same BFD session twice (negative case) (SHA1) """
194         key = self.factory.create_random_key(self)
195         key.add_vpp_config()
196         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
197                                    sha1_key=key)
198         session.add_vpp_config()
199         with self.assertRaises(Exception):
200             session.add_vpp_config()
201
202     def test_add_auth_nonexistent_key(self):
203         """ create BFD session using non-existent SHA1 (negative case) """
204         session = VppBFDUDPSession(
205             self, self.pg0, self.pg0.remote_ip4,
206             sha1_key=self.factory.create_random_key(self))
207         with self.assertRaises(Exception):
208             session.add_vpp_config()
209
210     def test_shared_sha1_key(self):
211         """ share single SHA1 key between multiple BFD sessions """
212         key = self.factory.create_random_key(self)
213         key.add_vpp_config()
214         sessions = [
215             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
216                              sha1_key=key),
217             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
218                              sha1_key=key, af=AF_INET6),
219             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
220                              sha1_key=key),
221             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
222                              sha1_key=key, af=AF_INET6)]
223         for s in sessions:
224             s.add_vpp_config()
225         removed = 0
226         for s in sessions:
227             e = key.get_bfd_auth_keys_dump_entry()
228             self.assert_equal(e.use_count, len(sessions) - removed,
229                               "Use count for shared key")
230             s.remove_vpp_config()
231             removed += 1
232         e = key.get_bfd_auth_keys_dump_entry()
233         self.assert_equal(e.use_count, len(sessions) - removed,
234                           "Use count for shared key")
235
236     def test_activate_auth(self):
237         """ activate SHA1 authentication """
238         key = self.factory.create_random_key(self)
239         key.add_vpp_config()
240         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
241         session.add_vpp_config()
242         session.activate_auth(key)
243
244     def test_deactivate_auth(self):
245         """ deactivate SHA1 authentication """
246         key = self.factory.create_random_key(self)
247         key.add_vpp_config()
248         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
249         session.add_vpp_config()
250         session.activate_auth(key)
251         session.deactivate_auth()
252
253     def test_change_key(self):
254         """ change SHA1 key """
255         key1 = self.factory.create_random_key(self)
256         key2 = self.factory.create_random_key(self)
257         while key2.conf_key_id == key1.conf_key_id:
258             key2 = self.factory.create_random_key(self)
259         key1.add_vpp_config()
260         key2.add_vpp_config()
261         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
262                                    sha1_key=key1)
263         session.add_vpp_config()
264         session.activate_auth(key2)
265
266     def test_set_del_udp_echo_source(self):
267         """ set/del udp echo source """
268         self.create_loopback_interfaces(1)
269         self.loopback0 = self.lo_interfaces[0]
270         self.loopback0.admin_up()
271         echo_source = self.vapi.bfd_udp_get_echo_source()
272         self.assertFalse(echo_source.is_set)
273         self.assertFalse(echo_source.have_usable_ip4)
274         self.assertFalse(echo_source.have_usable_ip6)
275
276         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
277         echo_source = self.vapi.bfd_udp_get_echo_source()
278         self.assertTrue(echo_source.is_set)
279         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
280         self.assertFalse(echo_source.have_usable_ip4)
281         self.assertFalse(echo_source.have_usable_ip6)
282
283         self.loopback0.config_ip4()
284         unpacked = unpack("!L", self.loopback0.local_ip4n)
285         echo_ip4 = pack("!L", unpacked[0] ^ 1)
286         echo_source = self.vapi.bfd_udp_get_echo_source()
287         self.assertTrue(echo_source.is_set)
288         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
289         self.assertTrue(echo_source.have_usable_ip4)
290         self.assertEqual(echo_source.ip4_addr, echo_ip4)
291         self.assertFalse(echo_source.have_usable_ip6)
292
293         self.loopback0.config_ip6()
294         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
295         echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
296                         unpacked[3] ^ 1)
297         echo_source = self.vapi.bfd_udp_get_echo_source()
298         self.assertTrue(echo_source.is_set)
299         self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
300         self.assertTrue(echo_source.have_usable_ip4)
301         self.assertEqual(echo_source.ip4_addr, echo_ip4)
302         self.assertTrue(echo_source.have_usable_ip6)
303         self.assertEqual(echo_source.ip6_addr, echo_ip6)
304
305         self.vapi.bfd_udp_del_echo_source()
306         echo_source = self.vapi.bfd_udp_get_echo_source()
307         self.assertFalse(echo_source.is_set)
308         self.assertFalse(echo_source.have_usable_ip4)
309         self.assertFalse(echo_source.have_usable_ip6)
310
311
312 class BFDTestSession(object):
313     """ BFD session as seen from test framework side """
314
315     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
316                  bfd_key_id=None, our_seq_number=None,
317                  tunnel_header=None, phy_interface=None):
318         self.test = test
319         self.af = af
320         self.sha1_key = sha1_key
321         self.bfd_key_id = bfd_key_id
322         self.interface = interface
323         if phy_interface:
324             self.phy_interface = phy_interface
325         else:
326             self.phy_interface = self.interface
327         self.udp_sport = randint(49152, 65535)
328         if our_seq_number is None:
329             self.our_seq_number = randint(0, 40000000)
330         else:
331             self.our_seq_number = our_seq_number
332         self.vpp_seq_number = None
333         self.my_discriminator = 0
334         self.desired_min_tx = 300000
335         self.required_min_rx = 300000
336         self.required_min_echo_rx = None
337         self.detect_mult = detect_mult
338         self.diag = BFDDiagCode.no_diagnostic
339         self.your_discriminator = None
340         self.state = BFDState.down
341         self.auth_type = BFDAuthType.no_auth
342         self.tunnel_header = tunnel_header
343
344     def inc_seq_num(self):
345         """ increment sequence number, wrapping if needed """
346         if self.our_seq_number == 0xFFFFFFFF:
347             self.our_seq_number = 0
348         else:
349             self.our_seq_number += 1
350
351     def update(self, my_discriminator=None, your_discriminator=None,
352                desired_min_tx=None, required_min_rx=None,
353                required_min_echo_rx=None, detect_mult=None,
354                diag=None, state=None, auth_type=None):
355         """ update BFD parameters associated with session """
356         if my_discriminator is not None:
357             self.my_discriminator = my_discriminator
358         if your_discriminator is not None:
359             self.your_discriminator = your_discriminator
360         if required_min_rx is not None:
361             self.required_min_rx = required_min_rx
362         if required_min_echo_rx is not None:
363             self.required_min_echo_rx = required_min_echo_rx
364         if desired_min_tx is not None:
365             self.desired_min_tx = desired_min_tx
366         if detect_mult is not None:
367             self.detect_mult = detect_mult
368         if diag is not None:
369             self.diag = diag
370         if state is not None:
371             self.state = state
372         if auth_type is not None:
373             self.auth_type = auth_type
374
375     def fill_packet_fields(self, packet):
376         """ set packet fields with known values in packet """
377         bfd = packet[BFD]
378         if self.my_discriminator:
379             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
380                                    self.my_discriminator)
381             bfd.my_discriminator = self.my_discriminator
382         if self.your_discriminator:
383             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
384                                    self.your_discriminator)
385             bfd.your_discriminator = self.your_discriminator
386         if self.required_min_rx:
387             self.test.logger.debug(
388                 "BFD: setting packet.required_min_rx_interval=%s",
389                 self.required_min_rx)
390             bfd.required_min_rx_interval = self.required_min_rx
391         if self.required_min_echo_rx:
392             self.test.logger.debug(
393                 "BFD: setting packet.required_min_echo_rx=%s",
394                 self.required_min_echo_rx)
395             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
396         if self.desired_min_tx:
397             self.test.logger.debug(
398                 "BFD: setting packet.desired_min_tx_interval=%s",
399                 self.desired_min_tx)
400             bfd.desired_min_tx_interval = self.desired_min_tx
401         if self.detect_mult:
402             self.test.logger.debug(
403                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
404             bfd.detect_mult = self.detect_mult
405         if self.diag:
406             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
407             bfd.diag = self.diag
408         if self.state:
409             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
410             bfd.state = self.state
411         if self.auth_type:
412             # this is used by a negative test-case
413             self.test.logger.debug("BFD: setting packet.auth_type=%s",
414                                    self.auth_type)
415             bfd.auth_type = self.auth_type
416
417     def create_packet(self):
418         """ create a BFD packet, reflecting the current state of session """
419         if self.sha1_key:
420             bfd = BFD(flags="A")
421             bfd.auth_type = self.sha1_key.auth_type
422             bfd.auth_len = BFD.sha1_auth_len
423             bfd.auth_key_id = self.bfd_key_id
424             bfd.auth_seq_num = self.our_seq_number
425             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
426         else:
427             bfd = BFD()
428         packet = Ether(src=self.phy_interface.remote_mac,
429                        dst=self.phy_interface.local_mac)
430         if self.tunnel_header:
431             packet = packet / self.tunnel_header
432         if self.af == AF_INET6:
433             packet = (packet /
434                       IPv6(src=self.interface.remote_ip6,
435                            dst=self.interface.local_ip6,
436                            hlim=255) /
437                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
438                       bfd)
439         else:
440             packet = (packet /
441                       IP(src=self.interface.remote_ip4,
442                          dst=self.interface.local_ip4,
443                          ttl=255) /
444                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
445                       bfd)
446         self.test.logger.debug("BFD: Creating packet")
447         self.fill_packet_fields(packet)
448         if self.sha1_key:
449             hash_material = scapy.compat.raw(
450                 packet[BFD])[:32] + self.sha1_key.key + \
451                 "\0" * (20 - len(self.sha1_key.key))
452             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
453                                    hashlib.sha1(hash_material).hexdigest())
454             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
455         return packet
456
457     def send_packet(self, packet=None, interface=None):
458         """ send packet on interface, creating the packet if needed """
459         if packet is None:
460             packet = self.create_packet()
461         if interface is None:
462             interface = self.phy_interface
463         self.test.logger.debug(ppp("Sending packet:", packet))
464         interface.add_stream(packet)
465         self.test.pg_start()
466
467     def verify_sha1_auth(self, packet):
468         """ Verify correctness of authentication in BFD layer. """
469         bfd = packet[BFD]
470         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
471         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
472                                BFDAuthType)
473         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
474         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
475         if self.vpp_seq_number is None:
476             self.vpp_seq_number = bfd.auth_seq_num
477             self.test.logger.debug("Received initial sequence number: %s" %
478                                    self.vpp_seq_number)
479         else:
480             recvd_seq_num = bfd.auth_seq_num
481             self.test.logger.debug("Received followup sequence number: %s" %
482                                    recvd_seq_num)
483             if self.vpp_seq_number < 0xffffffff:
484                 if self.sha1_key.auth_type == \
485                         BFDAuthType.meticulous_keyed_sha1:
486                     self.test.assert_equal(recvd_seq_num,
487                                            self.vpp_seq_number + 1,
488                                            "BFD sequence number")
489                 else:
490                     self.test.assert_in_range(recvd_seq_num,
491                                               self.vpp_seq_number,
492                                               self.vpp_seq_number + 1,
493                                               "BFD sequence number")
494             else:
495                 if self.sha1_key.auth_type == \
496                         BFDAuthType.meticulous_keyed_sha1:
497                     self.test.assert_equal(recvd_seq_num, 0,
498                                            "BFD sequence number")
499                 else:
500                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
501                                        "BFD sequence number not one of "
502                                        "(%s, 0)" % self.vpp_seq_number)
503             self.vpp_seq_number = recvd_seq_num
504         # last 20 bytes represent the hash - so replace them with the key,
505         # pad the result with zeros and hash the result
506         hash_material = bfd.original[:-20] + self.sha1_key.key + \
507             b"\0" * (20 - len(self.sha1_key.key))
508         expected_hash = hashlib.sha1(hash_material).hexdigest()
509         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
510                                expected_hash, "Auth key hash")
511
512     def verify_bfd(self, packet):
513         """ Verify correctness of BFD layer. """
514         bfd = packet[BFD]
515         self.test.assert_equal(bfd.version, 1, "BFD version")
516         self.test.assert_equal(bfd.your_discriminator,
517                                self.my_discriminator,
518                                "BFD - your discriminator")
519         if self.sha1_key:
520             self.verify_sha1_auth(packet)
521
522
523 def bfd_session_up(test):
524     """ Bring BFD session up """
525     test.logger.info("BFD: Waiting for slow hello")
526     p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
527     old_offset = None
528     if hasattr(test, 'vpp_clock_offset'):
529         old_offset = test.vpp_clock_offset
530     test.vpp_clock_offset = time.time() - p.time
531     test.logger.debug("BFD: Calculated vpp clock offset: %s",
532                       test.vpp_clock_offset)
533     if old_offset:
534         test.assertAlmostEqual(
535             old_offset, test.vpp_clock_offset, delta=0.5,
536             msg="vpp clock offset not stable (new: %s, old: %s)" %
537             (test.vpp_clock_offset, old_offset))
538     test.logger.info("BFD: Sending Init")
539     test.test_session.update(my_discriminator=randint(0, 40000000),
540                              your_discriminator=p[BFD].my_discriminator,
541                              state=BFDState.init)
542     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
543             BFDAuthType.meticulous_keyed_sha1:
544         test.test_session.inc_seq_num()
545     test.test_session.send_packet()
546     test.logger.info("BFD: Waiting for event")
547     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
548     verify_event(test, e, expected_state=BFDState.up)
549     test.logger.info("BFD: Session is Up")
550     test.test_session.update(state=BFDState.up)
551     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
552             BFDAuthType.meticulous_keyed_sha1:
553         test.test_session.inc_seq_num()
554     test.test_session.send_packet()
555     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
556
557
558 def bfd_session_down(test):
559     """ Bring BFD session down """
560     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
561     test.test_session.update(state=BFDState.down)
562     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
563             BFDAuthType.meticulous_keyed_sha1:
564         test.test_session.inc_seq_num()
565     test.test_session.send_packet()
566     test.logger.info("BFD: Waiting for event")
567     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
568     verify_event(test, e, expected_state=BFDState.down)
569     test.logger.info("BFD: Session is Down")
570     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
571
572
573 def verify_bfd_session_config(test, session, state=None):
574     dump = session.get_bfd_udp_session_dump_entry()
575     test.assertIsNotNone(dump)
576     # since dump is not none, we have verified that sw_if_index and addresses
577     # are valid (in get_bfd_udp_session_dump_entry)
578     if state:
579         test.assert_equal(dump.state, state, "session state")
580     test.assert_equal(dump.required_min_rx, session.required_min_rx,
581                       "required min rx interval")
582     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
583                       "desired min tx interval")
584     test.assert_equal(dump.detect_mult, session.detect_mult,
585                       "detect multiplier")
586     if session.sha1_key is None:
587         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
588     else:
589         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
590         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
591                           "bfd key id")
592         test.assert_equal(dump.conf_key_id,
593                           session.sha1_key.conf_key_id,
594                           "config key id")
595
596
597 def verify_ip(test, packet):
598     """ Verify correctness of IP layer. """
599     if test.vpp_session.af == AF_INET6:
600         ip = packet[IPv6]
601         local_ip = test.vpp_session.interface.local_ip6
602         remote_ip = test.vpp_session.interface.remote_ip6
603         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
604     else:
605         ip = packet[IP]
606         local_ip = test.vpp_session.interface.local_ip4
607         remote_ip = test.vpp_session.interface.remote_ip4
608         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
609     test.assert_equal(ip.src, local_ip, "IP source address")
610     test.assert_equal(ip.dst, remote_ip, "IP destination address")
611
612
613 def verify_udp(test, packet):
614     """ Verify correctness of UDP layer. """
615     udp = packet[UDP]
616     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
617     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
618                          "UDP source port")
619
620
621 def verify_event(test, event, expected_state):
622     """ Verify correctness of event values. """
623     e = event
624     test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
625     test.assert_equal(e.sw_if_index,
626                       test.vpp_session.interface.sw_if_index,
627                       "BFD interface index")
628     is_ipv6 = 0
629     if test.vpp_session.af == AF_INET6:
630         is_ipv6 = 1
631     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
632     if test.vpp_session.af == AF_INET:
633         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
634                           "Local IPv4 address")
635         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
636                           "Peer IPv4 address")
637     else:
638         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
639                           "Local IPv6 address")
640         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
641                           "Peer IPv6 address")
642     test.assert_equal(e.state, expected_state, BFDState)
643
644
645 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
646     """ wait for BFD packet and verify its correctness
647
648     :param timeout: how long to wait
649     :param pcap_time_min: ignore packets with pcap timestamp lower than this
650
651     :returns: tuple (packet, time spent waiting for packet)
652     """
653     test.logger.info("BFD: Waiting for BFD packet")
654     deadline = time.time() + timeout
655     counter = 0
656     while True:
657         counter += 1
658         # sanity check
659         test.assert_in_range(counter, 0, 100, "number of packets ignored")
660         time_left = deadline - time.time()
661         if time_left < 0:
662             raise CaptureTimeoutError("Packet did not arrive within timeout")
663         p = test.pg0.wait_for_packet(timeout=time_left)
664         test.logger.debug(ppp("BFD: Got packet:", p))
665         if pcap_time_min is not None and p.time < pcap_time_min:
666             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
667                                   "pcap time min %s):" %
668                                   (p.time, pcap_time_min), p))
669         else:
670             break
671     if is_tunnel:
672         # strip an IP layer and move to the next
673         p = p[IP].payload
674
675     bfd = p[BFD]
676     if bfd is None:
677         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
678     if bfd.payload:
679         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
680     verify_ip(test, p)
681     verify_udp(test, p)
682     test.test_session.verify_bfd(p)
683     return p
684
685
686 @unittest.skipUnless(running_extended_tests, "part of extended tests")
687 class BFD4TestCase(VppTestCase):
688     """Bidirectional Forwarding Detection (BFD)"""
689
690     pg0 = None
691     vpp_clock_offset = None
692     vpp_session = None
693     test_session = None
694
695     @classmethod
696     def setUpClass(cls):
697         super(BFD4TestCase, cls).setUpClass()
698         cls.vapi.cli("set log class bfd level debug")
699         try:
700             cls.create_pg_interfaces([0])
701             cls.create_loopback_interfaces(1)
702             cls.loopback0 = cls.lo_interfaces[0]
703             cls.loopback0.config_ip4()
704             cls.loopback0.admin_up()
705             cls.pg0.config_ip4()
706             cls.pg0.configure_ipv4_neighbors()
707             cls.pg0.admin_up()
708             cls.pg0.resolve_arp()
709
710         except Exception:
711             super(BFD4TestCase, cls).tearDownClass()
712             raise
713
714     @classmethod
715     def tearDownClass(cls):
716         super(BFD4TestCase, cls).tearDownClass()
717
718     def setUp(self):
719         super(BFD4TestCase, self).setUp()
720         self.factory = AuthKeyFactory()
721         self.vapi.want_bfd_events()
722         self.pg0.enable_capture()
723         try:
724             self.vpp_session = VppBFDUDPSession(self, self.pg0,
725                                                 self.pg0.remote_ip4)
726             self.vpp_session.add_vpp_config()
727             self.vpp_session.admin_up()
728             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
729         except:
730             self.vapi.want_bfd_events(enable_disable=0)
731             raise
732
733     def tearDown(self):
734         if not self.vpp_dead:
735             self.vapi.want_bfd_events(enable_disable=0)
736         self.vapi.collect_events()  # clear the event queue
737         super(BFD4TestCase, self).tearDown()
738
739     def test_session_up(self):
740         """ bring BFD session up """
741         bfd_session_up(self)
742
743     def test_session_up_by_ip(self):
744         """ bring BFD session up - first frame looked up by address pair """
745         self.logger.info("BFD: Sending Slow control frame")
746         self.test_session.update(my_discriminator=randint(0, 40000000))
747         self.test_session.send_packet()
748         self.pg0.enable_capture()
749         p = self.pg0.wait_for_packet(1)
750         self.assert_equal(p[BFD].your_discriminator,
751                           self.test_session.my_discriminator,
752                           "BFD - your discriminator")
753         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
754         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
755                                  state=BFDState.up)
756         self.logger.info("BFD: Waiting for event")
757         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
758         verify_event(self, e, expected_state=BFDState.init)
759         self.logger.info("BFD: Sending Up")
760         self.test_session.send_packet()
761         self.logger.info("BFD: Waiting for event")
762         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
763         verify_event(self, e, expected_state=BFDState.up)
764         self.logger.info("BFD: Session is Up")
765         self.test_session.update(state=BFDState.up)
766         self.test_session.send_packet()
767         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
768
769     def test_session_down(self):
770         """ bring BFD session down """
771         bfd_session_up(self)
772         bfd_session_down(self)
773
774     @unittest.skipUnless(running_extended_tests, "part of extended tests")
775     def test_hold_up(self):
776         """ hold BFD session up """
777         bfd_session_up(self)
778         for dummy in range(self.test_session.detect_mult * 2):
779             wait_for_bfd_packet(self)
780             self.test_session.send_packet()
781         self.assert_equal(len(self.vapi.collect_events()), 0,
782                           "number of bfd events")
783
784     @unittest.skipUnless(running_extended_tests, "part of extended tests")
785     def test_slow_timer(self):
786         """ verify slow periodic control frames while session down """
787         packet_count = 3
788         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
789         prev_packet = wait_for_bfd_packet(self, 2)
790         for dummy in range(packet_count):
791             next_packet = wait_for_bfd_packet(self, 2)
792             time_diff = next_packet.time - prev_packet.time
793             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
794             # to work around timing issues
795             self.assert_in_range(
796                 time_diff, 0.70, 1.05, "time between slow packets")
797             prev_packet = next_packet
798
799     @unittest.skipUnless(running_extended_tests, "part of extended tests")
800     def test_zero_remote_min_rx(self):
801         """ no packets when zero remote required min rx interval """
802         bfd_session_up(self)
803         self.test_session.update(required_min_rx=0)
804         self.test_session.send_packet()
805         for dummy in range(self.test_session.detect_mult):
806             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
807                        "sleep before transmitting bfd packet")
808             self.test_session.send_packet()
809             try:
810                 p = wait_for_bfd_packet(self, timeout=0)
811                 self.logger.error(ppp("Received unexpected packet:", p))
812             except CaptureTimeoutError:
813                 pass
814         self.assert_equal(
815             len(self.vapi.collect_events()), 0, "number of bfd events")
816         self.test_session.update(required_min_rx=300000)
817         for dummy in range(3):
818             self.test_session.send_packet()
819             wait_for_bfd_packet(
820                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
821         self.assert_equal(
822             len(self.vapi.collect_events()), 0, "number of bfd events")
823
824     @unittest.skipUnless(running_extended_tests, "part of extended tests")
825     def test_conn_down(self):
826         """ verify session goes down after inactivity """
827         bfd_session_up(self)
828         detection_time = self.test_session.detect_mult *\
829             self.vpp_session.required_min_rx / USEC_IN_SEC
830         self.sleep(detection_time, "waiting for BFD session time-out")
831         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
832         verify_event(self, e, expected_state=BFDState.down)
833
834     @unittest.skipUnless(running_extended_tests, "part of extended tests")
835     def test_large_required_min_rx(self):
836         """ large remote required min rx interval """
837         bfd_session_up(self)
838         p = wait_for_bfd_packet(self)
839         interval = 3000000
840         self.test_session.update(required_min_rx=interval)
841         self.test_session.send_packet()
842         time_mark = time.time()
843         count = 0
844         # busy wait here, trying to collect a packet or event, vpp is not
845         # allowed to send packets and the session will timeout first - so the
846         # Up->Down event must arrive before any packets do
847         while time.time() < time_mark + interval / USEC_IN_SEC:
848             try:
849                 p = wait_for_bfd_packet(self, timeout=0)
850                 # if vpp managed to send a packet before we did the session
851                 # session update, then that's fine, ignore it
852                 if p.time < time_mark - self.vpp_clock_offset:
853                     continue
854                 self.logger.error(ppp("Received unexpected packet:", p))
855                 count += 1
856             except CaptureTimeoutError:
857                 pass
858             events = self.vapi.collect_events()
859             if len(events) > 0:
860                 verify_event(self, events[0], BFDState.down)
861                 break
862         self.assert_equal(count, 0, "number of packets received")
863
864     @unittest.skipUnless(running_extended_tests, "part of extended tests")
865     def test_immediate_remote_min_rx_reduction(self):
866         """ immediately honor remote required min rx reduction """
867         self.vpp_session.remove_vpp_config()
868         self.vpp_session = VppBFDUDPSession(
869             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
870         self.pg0.enable_capture()
871         self.vpp_session.add_vpp_config()
872         self.test_session.update(desired_min_tx=1000000,
873                                  required_min_rx=1000000)
874         bfd_session_up(self)
875         reference_packet = wait_for_bfd_packet(self)
876         time_mark = time.time()
877         interval = 300000
878         self.test_session.update(required_min_rx=interval)
879         self.test_session.send_packet()
880         extra_time = time.time() - time_mark
881         p = wait_for_bfd_packet(self)
882         # first packet is allowed to be late by time we spent doing the update
883         # calculated in extra_time
884         self.assert_in_range(p.time - reference_packet.time,
885                              .95 * 0.75 * interval / USEC_IN_SEC,
886                              1.05 * interval / USEC_IN_SEC + extra_time,
887                              "time between BFD packets")
888         reference_packet = p
889         for dummy in range(3):
890             p = wait_for_bfd_packet(self)
891             diff = p.time - reference_packet.time
892             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
893                                  1.05 * interval / USEC_IN_SEC,
894                                  "time between BFD packets")
895             reference_packet = p
896
897     @unittest.skipUnless(running_extended_tests, "part of extended tests")
898     def test_modify_req_min_rx_double(self):
899         """ modify session - double required min rx """
900         bfd_session_up(self)
901         p = wait_for_bfd_packet(self)
902         self.test_session.update(desired_min_tx=10000,
903                                  required_min_rx=10000)
904         self.test_session.send_packet()
905         # double required min rx
906         self.vpp_session.modify_parameters(
907             required_min_rx=2 * self.vpp_session.required_min_rx)
908         p = wait_for_bfd_packet(
909             self, pcap_time_min=time.time() - self.vpp_clock_offset)
910         # poll bit needs to be set
911         self.assertIn("P", p.sprintf("%BFD.flags%"),
912                       "Poll bit not set in BFD packet")
913         # finish poll sequence with final packet
914         final = self.test_session.create_packet()
915         final[BFD].flags = "F"
916         timeout = self.test_session.detect_mult * \
917             max(self.test_session.desired_min_tx,
918                 self.vpp_session.required_min_rx) / USEC_IN_SEC
919         self.test_session.send_packet(final)
920         time_mark = time.time()
921         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
922         verify_event(self, e, expected_state=BFDState.down)
923         time_to_event = time.time() - time_mark
924         self.assert_in_range(time_to_event, .9 * timeout,
925                              1.1 * timeout, "session timeout")
926
927     @unittest.skipUnless(running_extended_tests, "part of extended tests")
928     def test_modify_req_min_rx_halve(self):
929         """ modify session - halve required min rx """
930         self.vpp_session.modify_parameters(
931             required_min_rx=2 * self.vpp_session.required_min_rx)
932         bfd_session_up(self)
933         p = wait_for_bfd_packet(self)
934         self.test_session.update(desired_min_tx=10000,
935                                  required_min_rx=10000)
936         self.test_session.send_packet()
937         p = wait_for_bfd_packet(
938             self, pcap_time_min=time.time() - self.vpp_clock_offset)
939         # halve required min rx
940         old_required_min_rx = self.vpp_session.required_min_rx
941         self.vpp_session.modify_parameters(
942             required_min_rx=self.vpp_session.required_min_rx // 2)
943         # now we wait 0.8*3*old-req-min-rx and the session should still be up
944         self.sleep(0.8 * self.vpp_session.detect_mult *
945                    old_required_min_rx / USEC_IN_SEC,
946                    "wait before finishing poll sequence")
947         self.assert_equal(len(self.vapi.collect_events()), 0,
948                           "number of bfd events")
949         p = wait_for_bfd_packet(self)
950         # poll bit needs to be set
951         self.assertIn("P", p.sprintf("%BFD.flags%"),
952                       "Poll bit not set in BFD packet")
953         # finish poll sequence with final packet
954         final = self.test_session.create_packet()
955         final[BFD].flags = "F"
956         self.test_session.send_packet(final)
957         # now the session should time out under new conditions
958         detection_time = self.test_session.detect_mult *\
959             self.vpp_session.required_min_rx / USEC_IN_SEC
960         before = time.time()
961         e = self.vapi.wait_for_event(
962             2 * detection_time, "bfd_udp_session_details")
963         after = time.time()
964         self.assert_in_range(after - before,
965                              0.9 * detection_time,
966                              1.1 * detection_time,
967                              "time before bfd session goes down")
968         verify_event(self, e, expected_state=BFDState.down)
969
970     @unittest.skipUnless(running_extended_tests, "part of extended tests")
971     def test_modify_detect_mult(self):
972         """ modify detect multiplier """
973         bfd_session_up(self)
974         p = wait_for_bfd_packet(self)
975         self.vpp_session.modify_parameters(detect_mult=1)
976         p = wait_for_bfd_packet(
977             self, pcap_time_min=time.time() - self.vpp_clock_offset)
978         self.assert_equal(self.vpp_session.detect_mult,
979                           p[BFD].detect_mult,
980                           "detect mult")
981         # poll bit must not be set
982         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
983                          "Poll bit not set in BFD packet")
984         self.vpp_session.modify_parameters(detect_mult=10)
985         p = wait_for_bfd_packet(
986             self, pcap_time_min=time.time() - self.vpp_clock_offset)
987         self.assert_equal(self.vpp_session.detect_mult,
988                           p[BFD].detect_mult,
989                           "detect mult")
990         # poll bit must not be set
991         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
992                          "Poll bit not set in BFD packet")
993
994     @unittest.skipUnless(running_extended_tests, "part of extended tests")
995     def test_queued_poll(self):
996         """ test poll sequence queueing """
997         bfd_session_up(self)
998         p = wait_for_bfd_packet(self)
999         self.vpp_session.modify_parameters(
1000             required_min_rx=2 * self.vpp_session.required_min_rx)
1001         p = wait_for_bfd_packet(self)
1002         poll_sequence_start = time.time()
1003         poll_sequence_length_min = 0.5
1004         send_final_after = time.time() + poll_sequence_length_min
1005         # poll bit needs to be set
1006         self.assertIn("P", p.sprintf("%BFD.flags%"),
1007                       "Poll bit not set in BFD packet")
1008         self.assert_equal(p[BFD].required_min_rx_interval,
1009                           self.vpp_session.required_min_rx,
1010                           "BFD required min rx interval")
1011         self.vpp_session.modify_parameters(
1012             required_min_rx=2 * self.vpp_session.required_min_rx)
1013         # 2nd poll sequence should be queued now
1014         # don't send the reply back yet, wait for some time to emulate
1015         # longer round-trip time
1016         packet_count = 0
1017         while time.time() < send_final_after:
1018             self.test_session.send_packet()
1019             p = wait_for_bfd_packet(self)
1020             self.assert_equal(len(self.vapi.collect_events()), 0,
1021                               "number of bfd events")
1022             self.assert_equal(p[BFD].required_min_rx_interval,
1023                               self.vpp_session.required_min_rx,
1024                               "BFD required min rx interval")
1025             packet_count += 1
1026             # poll bit must be set
1027             self.assertIn("P", p.sprintf("%BFD.flags%"),
1028                           "Poll bit not set in BFD packet")
1029         final = self.test_session.create_packet()
1030         final[BFD].flags = "F"
1031         self.test_session.send_packet(final)
1032         # finish 1st with final
1033         poll_sequence_length = time.time() - poll_sequence_start
1034         # vpp must wait for some time before starting new poll sequence
1035         poll_no_2_started = False
1036         for dummy in range(2 * packet_count):
1037             p = wait_for_bfd_packet(self)
1038             self.assert_equal(len(self.vapi.collect_events()), 0,
1039                               "number of bfd events")
1040             if "P" in p.sprintf("%BFD.flags%"):
1041                 poll_no_2_started = True
1042                 if time.time() < poll_sequence_start + poll_sequence_length:
1043                     raise Exception("VPP started 2nd poll sequence too soon")
1044                 final = self.test_session.create_packet()
1045                 final[BFD].flags = "F"
1046                 self.test_session.send_packet(final)
1047                 break
1048             else:
1049                 self.test_session.send_packet()
1050         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
1051         # finish 2nd with final
1052         final = self.test_session.create_packet()
1053         final[BFD].flags = "F"
1054         self.test_session.send_packet(final)
1055         p = wait_for_bfd_packet(self)
1056         # poll bit must not be set
1057         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
1058                          "Poll bit set in BFD packet")
1059
1060     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1061     def test_poll_response(self):
1062         """ test correct response to control frame with poll bit set """
1063         bfd_session_up(self)
1064         poll = self.test_session.create_packet()
1065         poll[BFD].flags = "P"
1066         self.test_session.send_packet(poll)
1067         final = wait_for_bfd_packet(
1068             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1069         self.assertIn("F", final.sprintf("%BFD.flags%"))
1070
1071     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1072     def test_no_periodic_if_remote_demand(self):
1073         """ no periodic frames outside poll sequence if remote demand set """
1074         bfd_session_up(self)
1075         demand = self.test_session.create_packet()
1076         demand[BFD].flags = "D"
1077         self.test_session.send_packet(demand)
1078         transmit_time = 0.9 \
1079             * max(self.vpp_session.required_min_rx,
1080                   self.test_session.desired_min_tx) \
1081             / USEC_IN_SEC
1082         count = 0
1083         for dummy in range(self.test_session.detect_mult * 2):
1084             self.sleep(transmit_time)
1085             self.test_session.send_packet(demand)
1086             try:
1087                 p = wait_for_bfd_packet(self, timeout=0)
1088                 self.logger.error(ppp("Received unexpected packet:", p))
1089                 count += 1
1090             except CaptureTimeoutError:
1091                 pass
1092         events = self.vapi.collect_events()
1093         for e in events:
1094             self.logger.error("Received unexpected event: %s", e)
1095         self.assert_equal(count, 0, "number of packets received")
1096         self.assert_equal(len(events), 0, "number of events received")
1097
1098     def test_echo_looped_back(self):
1099         """ echo packets looped back """
1100         # don't need a session in this case..
1101         self.vpp_session.remove_vpp_config()
1102         self.pg0.enable_capture()
1103         echo_packet_count = 10
1104         # random source port low enough to increment a few times..
1105         udp_sport_tx = randint(1, 50000)
1106         udp_sport_rx = udp_sport_tx
1107         echo_packet = (Ether(src=self.pg0.remote_mac,
1108                              dst=self.pg0.local_mac) /
1109                        IP(src=self.pg0.remote_ip4,
1110                           dst=self.pg0.remote_ip4) /
1111                        UDP(dport=BFD.udp_dport_echo) /
1112                        Raw("this should be looped back"))
1113         for dummy in range(echo_packet_count):
1114             self.sleep(.01, "delay between echo packets")
1115             echo_packet[UDP].sport = udp_sport_tx
1116             udp_sport_tx += 1
1117             self.logger.debug(ppp("Sending packet:", echo_packet))
1118             self.pg0.add_stream(echo_packet)
1119             self.pg_start()
1120         for dummy in range(echo_packet_count):
1121             p = self.pg0.wait_for_packet(1)
1122             self.logger.debug(ppp("Got packet:", p))
1123             ether = p[Ether]
1124             self.assert_equal(self.pg0.remote_mac,
1125                               ether.dst, "Destination MAC")
1126             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1127             ip = p[IP]
1128             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1129             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1130             udp = p[UDP]
1131             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1132                               "UDP destination port")
1133             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1134             udp_sport_rx += 1
1135             # need to compare the hex payload here, otherwise BFD_vpp_echo
1136             # gets in way
1137             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1138                              scapy.compat.raw(echo_packet[UDP].payload),
1139                              "Received packet is not the echo packet sent")
1140         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1141                           "ECHO packet identifier for test purposes)")
1142
1143     def test_echo(self):
1144         """ echo function """
1145         bfd_session_up(self)
1146         self.test_session.update(required_min_echo_rx=150000)
1147         self.test_session.send_packet()
1148         detection_time = self.test_session.detect_mult *\
1149             self.vpp_session.required_min_rx / USEC_IN_SEC
1150         # echo shouldn't work without echo source set
1151         for dummy in range(10):
1152             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1153             self.sleep(sleep, "delay before sending bfd packet")
1154             self.test_session.send_packet()
1155         p = wait_for_bfd_packet(
1156             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1157         self.assert_equal(p[BFD].required_min_rx_interval,
1158                           self.vpp_session.required_min_rx,
1159                           "BFD required min rx interval")
1160         self.test_session.send_packet()
1161         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1162         echo_seen = False
1163         # should be turned on - loopback echo packets
1164         for dummy in range(3):
1165             loop_until = time.time() + 0.75 * detection_time
1166             while time.time() < loop_until:
1167                 p = self.pg0.wait_for_packet(1)
1168                 self.logger.debug(ppp("Got packet:", p))
1169                 if p[UDP].dport == BFD.udp_dport_echo:
1170                     self.assert_equal(
1171                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1172                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1173                                         "BFD ECHO src IP equal to loopback IP")
1174                     self.logger.debug(ppp("Looping back packet:", p))
1175                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1176                                       "ECHO packet destination MAC address")
1177                     p[Ether].dst = self.pg0.local_mac
1178                     self.pg0.add_stream(p)
1179                     self.pg_start()
1180                     echo_seen = True
1181                 elif p.haslayer(BFD):
1182                     if echo_seen:
1183                         self.assertGreaterEqual(
1184                             p[BFD].required_min_rx_interval,
1185                             1000000)
1186                     if "P" in p.sprintf("%BFD.flags%"):
1187                         final = self.test_session.create_packet()
1188                         final[BFD].flags = "F"
1189                         self.test_session.send_packet(final)
1190                 else:
1191                     raise Exception(ppp("Received unknown packet:", p))
1192
1193                 self.assert_equal(len(self.vapi.collect_events()), 0,
1194                                   "number of bfd events")
1195             self.test_session.send_packet()
1196         self.assertTrue(echo_seen, "No echo packets received")
1197
1198     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1199     def test_echo_fail(self):
1200         """ session goes down if echo function fails """
1201         bfd_session_up(self)
1202         self.test_session.update(required_min_echo_rx=150000)
1203         self.test_session.send_packet()
1204         detection_time = self.test_session.detect_mult *\
1205             self.vpp_session.required_min_rx / USEC_IN_SEC
1206         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1207         # echo function should be used now, but we will drop the echo packets
1208         verified_diag = False
1209         for dummy in range(3):
1210             loop_until = time.time() + 0.75 * detection_time
1211             while time.time() < loop_until:
1212                 p = self.pg0.wait_for_packet(1)
1213                 self.logger.debug(ppp("Got packet:", p))
1214                 if p[UDP].dport == BFD.udp_dport_echo:
1215                     # dropped
1216                     pass
1217                 elif p.haslayer(BFD):
1218                     if "P" in p.sprintf("%BFD.flags%"):
1219                         self.assertGreaterEqual(
1220                             p[BFD].required_min_rx_interval,
1221                             1000000)
1222                         final = self.test_session.create_packet()
1223                         final[BFD].flags = "F"
1224                         self.test_session.send_packet(final)
1225                     if p[BFD].state == BFDState.down:
1226                         self.assert_equal(p[BFD].diag,
1227                                           BFDDiagCode.echo_function_failed,
1228                                           BFDDiagCode)
1229                         verified_diag = True
1230                 else:
1231                     raise Exception(ppp("Received unknown packet:", p))
1232             self.test_session.send_packet()
1233         events = self.vapi.collect_events()
1234         self.assert_equal(len(events), 1, "number of bfd events")
1235         self.assert_equal(events[0].state, BFDState.down, BFDState)
1236         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1237
1238     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1239     def test_echo_stop(self):
1240         """ echo function stops if peer sets required min echo rx zero """
1241         bfd_session_up(self)
1242         self.test_session.update(required_min_echo_rx=150000)
1243         self.test_session.send_packet()
1244         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1245         # wait for first echo packet
1246         while True:
1247             p = self.pg0.wait_for_packet(1)
1248             self.logger.debug(ppp("Got packet:", p))
1249             if p[UDP].dport == BFD.udp_dport_echo:
1250                 self.logger.debug(ppp("Looping back packet:", p))
1251                 p[Ether].dst = self.pg0.local_mac
1252                 self.pg0.add_stream(p)
1253                 self.pg_start()
1254                 break
1255             elif p.haslayer(BFD):
1256                 # ignore BFD
1257                 pass
1258             else:
1259                 raise Exception(ppp("Received unknown packet:", p))
1260         self.test_session.update(required_min_echo_rx=0)
1261         self.test_session.send_packet()
1262         # echo packets shouldn't arrive anymore
1263         for dummy in range(5):
1264             wait_for_bfd_packet(
1265                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1266             self.test_session.send_packet()
1267             events = self.vapi.collect_events()
1268             self.assert_equal(len(events), 0, "number of bfd events")
1269
1270     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1271     def test_echo_source_removed(self):
1272         """ echo function stops if echo source is removed """
1273         bfd_session_up(self)
1274         self.test_session.update(required_min_echo_rx=150000)
1275         self.test_session.send_packet()
1276         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1277         # wait for first echo packet
1278         while True:
1279             p = self.pg0.wait_for_packet(1)
1280             self.logger.debug(ppp("Got packet:", p))
1281             if p[UDP].dport == BFD.udp_dport_echo:
1282                 self.logger.debug(ppp("Looping back packet:", p))
1283                 p[Ether].dst = self.pg0.local_mac
1284                 self.pg0.add_stream(p)
1285                 self.pg_start()
1286                 break
1287             elif p.haslayer(BFD):
1288                 # ignore BFD
1289                 pass
1290             else:
1291                 raise Exception(ppp("Received unknown packet:", p))
1292         self.vapi.bfd_udp_del_echo_source()
1293         self.test_session.send_packet()
1294         # echo packets shouldn't arrive anymore
1295         for dummy in range(5):
1296             wait_for_bfd_packet(
1297                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1298             self.test_session.send_packet()
1299             events = self.vapi.collect_events()
1300             self.assert_equal(len(events), 0, "number of bfd events")
1301
1302     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1303     def test_stale_echo(self):
1304         """ stale echo packets don't keep a session up """
1305         bfd_session_up(self)
1306         self.test_session.update(required_min_echo_rx=150000)
1307         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1308         self.test_session.send_packet()
1309         # should be turned on - loopback echo packets
1310         echo_packet = None
1311         timeout_at = None
1312         timeout_ok = False
1313         for dummy in range(10 * self.vpp_session.detect_mult):
1314             p = self.pg0.wait_for_packet(1)
1315             if p[UDP].dport == BFD.udp_dport_echo:
1316                 if echo_packet is None:
1317                     self.logger.debug(ppp("Got first echo packet:", p))
1318                     echo_packet = p
1319                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1320                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1321                 else:
1322                     self.logger.debug(ppp("Got followup echo packet:", p))
1323                 self.logger.debug(ppp("Looping back first echo packet:", p))
1324                 echo_packet[Ether].dst = self.pg0.local_mac
1325                 self.pg0.add_stream(echo_packet)
1326                 self.pg_start()
1327             elif p.haslayer(BFD):
1328                 self.logger.debug(ppp("Got packet:", p))
1329                 if "P" in p.sprintf("%BFD.flags%"):
1330                     final = self.test_session.create_packet()
1331                     final[BFD].flags = "F"
1332                     self.test_session.send_packet(final)
1333                 if p[BFD].state == BFDState.down:
1334                     self.assertIsNotNone(
1335                         timeout_at,
1336                         "Session went down before first echo packet received")
1337                     now = time.time()
1338                     self.assertGreaterEqual(
1339                         now, timeout_at,
1340                         "Session timeout at %s, but is expected at %s" %
1341                         (now, timeout_at))
1342                     self.assert_equal(p[BFD].diag,
1343                                       BFDDiagCode.echo_function_failed,
1344                                       BFDDiagCode)
1345                     events = self.vapi.collect_events()
1346                     self.assert_equal(len(events), 1, "number of bfd events")
1347                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1348                     timeout_ok = True
1349                     break
1350             else:
1351                 raise Exception(ppp("Received unknown packet:", p))
1352             self.test_session.send_packet()
1353         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1354
1355     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1356     def test_invalid_echo_checksum(self):
1357         """ echo packets with invalid checksum don't keep a session up """
1358         bfd_session_up(self)
1359         self.test_session.update(required_min_echo_rx=150000)
1360         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1361         self.test_session.send_packet()
1362         # should be turned on - loopback echo packets
1363         timeout_at = None
1364         timeout_ok = False
1365         for dummy in range(10 * self.vpp_session.detect_mult):
1366             p = self.pg0.wait_for_packet(1)
1367             if p[UDP].dport == BFD.udp_dport_echo:
1368                 self.logger.debug(ppp("Got echo packet:", p))
1369                 if timeout_at is None:
1370                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1371                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1372                 p[BFD_vpp_echo].checksum = getrandbits(64)
1373                 p[Ether].dst = self.pg0.local_mac
1374                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1375                 self.pg0.add_stream(p)
1376                 self.pg_start()
1377             elif p.haslayer(BFD):
1378                 self.logger.debug(ppp("Got packet:", p))
1379                 if "P" in p.sprintf("%BFD.flags%"):
1380                     final = self.test_session.create_packet()
1381                     final[BFD].flags = "F"
1382                     self.test_session.send_packet(final)
1383                 if p[BFD].state == BFDState.down:
1384                     self.assertIsNotNone(
1385                         timeout_at,
1386                         "Session went down before first echo packet received")
1387                     now = time.time()
1388                     self.assertGreaterEqual(
1389                         now, timeout_at,
1390                         "Session timeout at %s, but is expected at %s" %
1391                         (now, timeout_at))
1392                     self.assert_equal(p[BFD].diag,
1393                                       BFDDiagCode.echo_function_failed,
1394                                       BFDDiagCode)
1395                     events = self.vapi.collect_events()
1396                     self.assert_equal(len(events), 1, "number of bfd events")
1397                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1398                     timeout_ok = True
1399                     break
1400             else:
1401                 raise Exception(ppp("Received unknown packet:", p))
1402             self.test_session.send_packet()
1403         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1404
1405     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1406     def test_admin_up_down(self):
1407         """ put session admin-up and admin-down """
1408         bfd_session_up(self)
1409         self.vpp_session.admin_down()
1410         self.pg0.enable_capture()
1411         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1412         verify_event(self, e, expected_state=BFDState.admin_down)
1413         for dummy in range(2):
1414             p = wait_for_bfd_packet(self)
1415             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1416         # try to bring session up - shouldn't be possible
1417         self.test_session.update(state=BFDState.init)
1418         self.test_session.send_packet()
1419         for dummy in range(2):
1420             p = wait_for_bfd_packet(self)
1421             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1422         self.vpp_session.admin_up()
1423         self.test_session.update(state=BFDState.down)
1424         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1425         verify_event(self, e, expected_state=BFDState.down)
1426         p = wait_for_bfd_packet(
1427             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1428         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1429         self.test_session.send_packet()
1430         p = wait_for_bfd_packet(
1431             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1432         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1433         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1434         verify_event(self, e, expected_state=BFDState.init)
1435         self.test_session.update(state=BFDState.up)
1436         self.test_session.send_packet()
1437         p = wait_for_bfd_packet(
1438             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1439         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1440         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1441         verify_event(self, e, expected_state=BFDState.up)
1442
1443     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1444     def test_config_change_remote_demand(self):
1445         """ configuration change while peer in demand mode """
1446         bfd_session_up(self)
1447         demand = self.test_session.create_packet()
1448         demand[BFD].flags = "D"
1449         self.test_session.send_packet(demand)
1450         self.vpp_session.modify_parameters(
1451             required_min_rx=2 * self.vpp_session.required_min_rx)
1452         p = wait_for_bfd_packet(
1453             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1454         # poll bit must be set
1455         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1456         # terminate poll sequence
1457         final = self.test_session.create_packet()
1458         final[BFD].flags = "D+F"
1459         self.test_session.send_packet(final)
1460         # vpp should be quiet now again
1461         transmit_time = 0.9 \
1462             * max(self.vpp_session.required_min_rx,
1463                   self.test_session.desired_min_tx) \
1464             / USEC_IN_SEC
1465         count = 0
1466         for dummy in range(self.test_session.detect_mult * 2):
1467             self.sleep(transmit_time)
1468             self.test_session.send_packet(demand)
1469             try:
1470                 p = wait_for_bfd_packet(self, timeout=0)
1471                 self.logger.error(ppp("Received unexpected packet:", p))
1472                 count += 1
1473             except CaptureTimeoutError:
1474                 pass
1475         events = self.vapi.collect_events()
1476         for e in events:
1477             self.logger.error("Received unexpected event: %s", e)
1478         self.assert_equal(count, 0, "number of packets received")
1479         self.assert_equal(len(events), 0, "number of events received")
1480
1481     def test_intf_deleted(self):
1482         """ interface with bfd session deleted """
1483         intf = VppLoInterface(self)
1484         intf.config_ip4()
1485         intf.admin_up()
1486         sw_if_index = intf.sw_if_index
1487         vpp_session = VppBFDUDPSession(self, intf, intf.remote_ip4)
1488         vpp_session.add_vpp_config()
1489         vpp_session.admin_up()
1490         intf.remove_vpp_config()
1491         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1492         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1493         self.assertFalse(vpp_session.query_vpp_config())
1494
1495
1496 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1497 class BFD6TestCase(VppTestCase):
1498     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1499
1500     pg0 = None
1501     vpp_clock_offset = None
1502     vpp_session = None
1503     test_session = None
1504
1505     @classmethod
1506     def setUpClass(cls):
1507         super(BFD6TestCase, cls).setUpClass()
1508         cls.vapi.cli("set log class bfd level debug")
1509         try:
1510             cls.create_pg_interfaces([0])
1511             cls.pg0.config_ip6()
1512             cls.pg0.configure_ipv6_neighbors()
1513             cls.pg0.admin_up()
1514             cls.pg0.resolve_ndp()
1515             cls.create_loopback_interfaces(1)
1516             cls.loopback0 = cls.lo_interfaces[0]
1517             cls.loopback0.config_ip6()
1518             cls.loopback0.admin_up()
1519
1520         except Exception:
1521             super(BFD6TestCase, cls).tearDownClass()
1522             raise
1523
1524     @classmethod
1525     def tearDownClass(cls):
1526         super(BFD6TestCase, cls).tearDownClass()
1527
1528     def setUp(self):
1529         super(BFD6TestCase, self).setUp()
1530         self.factory = AuthKeyFactory()
1531         self.vapi.want_bfd_events()
1532         self.pg0.enable_capture()
1533         try:
1534             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1535                                                 self.pg0.remote_ip6,
1536                                                 af=AF_INET6)
1537             self.vpp_session.add_vpp_config()
1538             self.vpp_session.admin_up()
1539             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1540             self.logger.debug(self.vapi.cli("show adj nbr"))
1541         except:
1542             self.vapi.want_bfd_events(enable_disable=0)
1543             raise
1544
1545     def tearDown(self):
1546         if not self.vpp_dead:
1547             self.vapi.want_bfd_events(enable_disable=0)
1548         self.vapi.collect_events()  # clear the event queue
1549         super(BFD6TestCase, self).tearDown()
1550
1551     def test_session_up(self):
1552         """ bring BFD session up """
1553         bfd_session_up(self)
1554
1555     def test_session_up_by_ip(self):
1556         """ bring BFD session up - first frame looked up by address pair """
1557         self.logger.info("BFD: Sending Slow control frame")
1558         self.test_session.update(my_discriminator=randint(0, 40000000))
1559         self.test_session.send_packet()
1560         self.pg0.enable_capture()
1561         p = self.pg0.wait_for_packet(1)
1562         self.assert_equal(p[BFD].your_discriminator,
1563                           self.test_session.my_discriminator,
1564                           "BFD - your discriminator")
1565         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1566         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1567                                  state=BFDState.up)
1568         self.logger.info("BFD: Waiting for event")
1569         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1570         verify_event(self, e, expected_state=BFDState.init)
1571         self.logger.info("BFD: Sending Up")
1572         self.test_session.send_packet()
1573         self.logger.info("BFD: Waiting for event")
1574         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1575         verify_event(self, e, expected_state=BFDState.up)
1576         self.logger.info("BFD: Session is Up")
1577         self.test_session.update(state=BFDState.up)
1578         self.test_session.send_packet()
1579         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1580
1581     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1582     def test_hold_up(self):
1583         """ hold BFD session up """
1584         bfd_session_up(self)
1585         for dummy in range(self.test_session.detect_mult * 2):
1586             wait_for_bfd_packet(self)
1587             self.test_session.send_packet()
1588         self.assert_equal(len(self.vapi.collect_events()), 0,
1589                           "number of bfd events")
1590         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1591
1592     def test_echo_looped_back(self):
1593         """ echo packets looped back """
1594         # don't need a session in this case..
1595         self.vpp_session.remove_vpp_config()
1596         self.pg0.enable_capture()
1597         echo_packet_count = 10
1598         # random source port low enough to increment a few times..
1599         udp_sport_tx = randint(1, 50000)
1600         udp_sport_rx = udp_sport_tx
1601         echo_packet = (Ether(src=self.pg0.remote_mac,
1602                              dst=self.pg0.local_mac) /
1603                        IPv6(src=self.pg0.remote_ip6,
1604                             dst=self.pg0.remote_ip6) /
1605                        UDP(dport=BFD.udp_dport_echo) /
1606                        Raw("this should be looped back"))
1607         for dummy in range(echo_packet_count):
1608             self.sleep(.01, "delay between echo packets")
1609             echo_packet[UDP].sport = udp_sport_tx
1610             udp_sport_tx += 1
1611             self.logger.debug(ppp("Sending packet:", echo_packet))
1612             self.pg0.add_stream(echo_packet)
1613             self.pg_start()
1614         for dummy in range(echo_packet_count):
1615             p = self.pg0.wait_for_packet(1)
1616             self.logger.debug(ppp("Got packet:", p))
1617             ether = p[Ether]
1618             self.assert_equal(self.pg0.remote_mac,
1619                               ether.dst, "Destination MAC")
1620             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1621             ip = p[IPv6]
1622             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1623             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1624             udp = p[UDP]
1625             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1626                               "UDP destination port")
1627             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1628             udp_sport_rx += 1
1629             # need to compare the hex payload here, otherwise BFD_vpp_echo
1630             # gets in way
1631             self.assertEqual(scapy.compat.raw(p[UDP].payload),
1632                              scapy.compat.raw(echo_packet[UDP].payload),
1633                              "Received packet is not the echo packet sent")
1634         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1635                           "ECHO packet identifier for test purposes)")
1636         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1637                           "ECHO packet identifier for test purposes)")
1638
1639     def test_echo(self):
1640         """ echo function """
1641         bfd_session_up(self)
1642         self.test_session.update(required_min_echo_rx=150000)
1643         self.test_session.send_packet()
1644         detection_time = self.test_session.detect_mult *\
1645             self.vpp_session.required_min_rx / USEC_IN_SEC
1646         # echo shouldn't work without echo source set
1647         for dummy in range(10):
1648             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1649             self.sleep(sleep, "delay before sending bfd packet")
1650             self.test_session.send_packet()
1651         p = wait_for_bfd_packet(
1652             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1653         self.assert_equal(p[BFD].required_min_rx_interval,
1654                           self.vpp_session.required_min_rx,
1655                           "BFD required min rx interval")
1656         self.test_session.send_packet()
1657         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1658         echo_seen = False
1659         # should be turned on - loopback echo packets
1660         for dummy in range(3):
1661             loop_until = time.time() + 0.75 * detection_time
1662             while time.time() < loop_until:
1663                 p = self.pg0.wait_for_packet(1)
1664                 self.logger.debug(ppp("Got packet:", p))
1665                 if p[UDP].dport == BFD.udp_dport_echo:
1666                     self.assert_equal(
1667                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1668                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1669                                         "BFD ECHO src IP equal to loopback IP")
1670                     self.logger.debug(ppp("Looping back packet:", p))
1671                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1672                                       "ECHO packet destination MAC address")
1673                     p[Ether].dst = self.pg0.local_mac
1674                     self.pg0.add_stream(p)
1675                     self.pg_start()
1676                     echo_seen = True
1677                 elif p.haslayer(BFD):
1678                     if echo_seen:
1679                         self.assertGreaterEqual(
1680                             p[BFD].required_min_rx_interval,
1681                             1000000)
1682                     if "P" in p.sprintf("%BFD.flags%"):
1683                         final = self.test_session.create_packet()
1684                         final[BFD].flags = "F"
1685                         self.test_session.send_packet(final)
1686                 else:
1687                     raise Exception(ppp("Received unknown packet:", p))
1688
1689                 self.assert_equal(len(self.vapi.collect_events()), 0,
1690                                   "number of bfd events")
1691             self.test_session.send_packet()
1692         self.assertTrue(echo_seen, "No echo packets received")
1693
1694     def test_intf_deleted(self):
1695         """ interface with bfd session deleted """
1696         intf = VppLoInterface(self)
1697         intf.config_ip6()
1698         intf.admin_up()
1699         sw_if_index = intf.sw_if_index
1700         vpp_session = VppBFDUDPSession(
1701             self, intf, intf.remote_ip6, af=AF_INET6)
1702         vpp_session.add_vpp_config()
1703         vpp_session.admin_up()
1704         intf.remove_vpp_config()
1705         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1706         self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
1707         self.assertFalse(vpp_session.query_vpp_config())
1708
1709
1710 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1711 class BFDFIBTestCase(VppTestCase):
1712     """ BFD-FIB interactions (IPv6) """
1713
1714     vpp_session = None
1715     test_session = None
1716
1717     @classmethod
1718     def setUpClass(cls):
1719         super(BFDFIBTestCase, cls).setUpClass()
1720
1721     @classmethod
1722     def tearDownClass(cls):
1723         super(BFDFIBTestCase, cls).tearDownClass()
1724
1725     def setUp(self):
1726         super(BFDFIBTestCase, self).setUp()
1727         self.create_pg_interfaces(range(1))
1728
1729         self.vapi.want_bfd_events()
1730         self.pg0.enable_capture()
1731
1732         for i in self.pg_interfaces:
1733             i.admin_up()
1734             i.config_ip6()
1735             i.configure_ipv6_neighbors()
1736
1737     def tearDown(self):
1738         if not self.vpp_dead:
1739             self.vapi.want_bfd_events(enable_disable=0)
1740
1741         super(BFDFIBTestCase, self).tearDown()
1742
1743     @staticmethod
1744     def pkt_is_not_data_traffic(p):
1745         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1746         if p.haslayer(BFD) or is_ipv6_misc(p):
1747             return True
1748         return False
1749
1750     def test_session_with_fib(self):
1751         """ BFD-FIB interactions """
1752
1753         # packets to match against both of the routes
1754         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1755               IPv6(src="3001::1", dst="2001::1") /
1756               UDP(sport=1234, dport=1234) /
1757               Raw('\xa5' * 100)),
1758              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1759               IPv6(src="3001::1", dst="2002::1") /
1760               UDP(sport=1234, dport=1234) /
1761               Raw('\xa5' * 100))]
1762
1763         # A recursive and a non-recursive route via a next-hop that
1764         # will have a BFD session
1765         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1766                                   [VppRoutePath(self.pg0.remote_ip6,
1767                                                 self.pg0.sw_if_index)])
1768         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1769                                   [VppRoutePath(self.pg0.remote_ip6,
1770                                                 0xffffffff)])
1771         ip_2001_s_64.add_vpp_config()
1772         ip_2002_s_64.add_vpp_config()
1773
1774         # bring the session up now the routes are present
1775         self.vpp_session = VppBFDUDPSession(self,
1776                                             self.pg0,
1777                                             self.pg0.remote_ip6,
1778                                             af=AF_INET6)
1779         self.vpp_session.add_vpp_config()
1780         self.vpp_session.admin_up()
1781         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1782
1783         # session is up - traffic passes
1784         bfd_session_up(self)
1785
1786         self.pg0.add_stream(p)
1787         self.pg_start()
1788         for packet in p:
1789             captured = self.pg0.wait_for_packet(
1790                 1,
1791                 filter_out_fn=self.pkt_is_not_data_traffic)
1792             self.assertEqual(captured[IPv6].dst,
1793                              packet[IPv6].dst)
1794
1795         # session is up - traffic is dropped
1796         bfd_session_down(self)
1797
1798         self.pg0.add_stream(p)
1799         self.pg_start()
1800         with self.assertRaises(CaptureTimeoutError):
1801             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1802
1803         # session is up - traffic passes
1804         bfd_session_up(self)
1805
1806         self.pg0.add_stream(p)
1807         self.pg_start()
1808         for packet in p:
1809             captured = self.pg0.wait_for_packet(
1810                 1,
1811                 filter_out_fn=self.pkt_is_not_data_traffic)
1812             self.assertEqual(captured[IPv6].dst,
1813                              packet[IPv6].dst)
1814
1815
1816 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1817 class BFDTunTestCase(VppTestCase):
1818     """ BFD over GRE tunnel """
1819
1820     vpp_session = None
1821     test_session = None
1822
1823     @classmethod
1824     def setUpClass(cls):
1825         super(BFDTunTestCase, cls).setUpClass()
1826
1827     @classmethod
1828     def tearDownClass(cls):
1829         super(BFDTunTestCase, cls).tearDownClass()
1830
1831     def setUp(self):
1832         super(BFDTunTestCase, self).setUp()
1833         self.create_pg_interfaces(range(1))
1834
1835         self.vapi.want_bfd_events()
1836         self.pg0.enable_capture()
1837
1838         for i in self.pg_interfaces:
1839             i.admin_up()
1840             i.config_ip4()
1841             i.resolve_arp()
1842
1843     def tearDown(self):
1844         if not self.vpp_dead:
1845             self.vapi.want_bfd_events(enable_disable=0)
1846
1847         super(BFDTunTestCase, self).tearDown()
1848
1849     @staticmethod
1850     def pkt_is_not_data_traffic(p):
1851         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1852         if p.haslayer(BFD) or is_ipv6_misc(p):
1853             return True
1854         return False
1855
1856     def test_bfd_o_gre(self):
1857         """ BFD-o-GRE  """
1858
1859         # A GRE interface over which to run a BFD session
1860         gre_if = VppGreInterface(self,
1861                                  self.pg0.local_ip4,
1862                                  self.pg0.remote_ip4)
1863         gre_if.add_vpp_config()
1864         gre_if.admin_up()
1865         gre_if.config_ip4()
1866
1867         # bring the session up now the routes are present
1868         self.vpp_session = VppBFDUDPSession(self,
1869                                             gre_if,
1870                                             gre_if.remote_ip4,
1871                                             is_tunnel=True)
1872         self.vpp_session.add_vpp_config()
1873         self.vpp_session.admin_up()
1874
1875         self.test_session = BFDTestSession(
1876             self, gre_if, AF_INET,
1877             tunnel_header=(IP(src=self.pg0.remote_ip4,
1878                               dst=self.pg0.local_ip4) /
1879                            GRE()),
1880             phy_interface=self.pg0)
1881
1882         # packets to match against both of the routes
1883         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1884               IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
1885               UDP(sport=1234, dport=1234) /
1886               Raw('\xa5' * 100))]
1887
1888         # session is up - traffic passes
1889         bfd_session_up(self)
1890
1891         self.send_and_expect(self.pg0, p, self.pg0)
1892
1893         # bring session down
1894         bfd_session_down(self)
1895
1896
1897 @unittest.skipUnless(running_extended_tests, "part of extended tests")
1898 class BFDSHA1TestCase(VppTestCase):
1899     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1900
1901     pg0 = None
1902     vpp_clock_offset = None
1903     vpp_session = None
1904     test_session = None
1905
1906     @classmethod
1907     def setUpClass(cls):
1908         super(BFDSHA1TestCase, cls).setUpClass()
1909         cls.vapi.cli("set log class bfd level debug")
1910         try:
1911             cls.create_pg_interfaces([0])
1912             cls.pg0.config_ip4()
1913             cls.pg0.admin_up()
1914             cls.pg0.resolve_arp()
1915
1916         except Exception:
1917             super(BFDSHA1TestCase, cls).tearDownClass()
1918             raise
1919
1920     @classmethod
1921     def tearDownClass(cls):
1922         super(BFDSHA1TestCase, cls).tearDownClass()
1923
1924     def setUp(self):
1925         super(BFDSHA1TestCase, self).setUp()
1926         self.factory = AuthKeyFactory()
1927         self.vapi.want_bfd_events()
1928         self.pg0.enable_capture()
1929
1930     def tearDown(self):
1931         if not self.vpp_dead:
1932             self.vapi.want_bfd_events(enable_disable=0)
1933         self.vapi.collect_events()  # clear the event queue
1934         super(BFDSHA1TestCase, self).tearDown()
1935
1936     def test_session_up(self):
1937         """ bring BFD session up """
1938         key = self.factory.create_random_key(self)
1939         key.add_vpp_config()
1940         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1941                                             self.pg0.remote_ip4,
1942                                             sha1_key=key)
1943         self.vpp_session.add_vpp_config()
1944         self.vpp_session.admin_up()
1945         self.test_session = BFDTestSession(
1946             self, self.pg0, AF_INET, sha1_key=key,
1947             bfd_key_id=self.vpp_session.bfd_key_id)
1948         bfd_session_up(self)
1949
1950     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1951     def test_hold_up(self):
1952         """ hold BFD session up """
1953         key = self.factory.create_random_key(self)
1954         key.add_vpp_config()
1955         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1956                                             self.pg0.remote_ip4,
1957                                             sha1_key=key)
1958         self.vpp_session.add_vpp_config()
1959         self.vpp_session.admin_up()
1960         self.test_session = BFDTestSession(
1961             self, self.pg0, AF_INET, sha1_key=key,
1962             bfd_key_id=self.vpp_session.bfd_key_id)
1963         bfd_session_up(self)
1964         for dummy in range(self.test_session.detect_mult * 2):
1965             wait_for_bfd_packet(self)
1966             self.test_session.send_packet()
1967         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1968
1969     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1970     def test_hold_up_meticulous(self):
1971         """ hold BFD session up - meticulous auth """
1972         key = self.factory.create_random_key(
1973             self, BFDAuthType.meticulous_keyed_sha1)
1974         key.add_vpp_config()
1975         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1976                                             self.pg0.remote_ip4, sha1_key=key)
1977         self.vpp_session.add_vpp_config()
1978         self.vpp_session.admin_up()
1979         # specify sequence number so that it wraps
1980         self.test_session = BFDTestSession(
1981             self, self.pg0, AF_INET, sha1_key=key,
1982             bfd_key_id=self.vpp_session.bfd_key_id,
1983             our_seq_number=0xFFFFFFFF - 4)
1984         bfd_session_up(self)
1985         for dummy in range(30):
1986             wait_for_bfd_packet(self)
1987             self.test_session.inc_seq_num()
1988             self.test_session.send_packet()
1989         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1990
1991     @unittest.skipUnless(running_extended_tests, "part of extended tests")
1992     def test_send_bad_seq_number(self):
1993         """ session is not kept alive by msgs with bad sequence numbers"""
1994         key = self.factory.create_random_key(
1995             self, BFDAuthType.meticulous_keyed_sha1)
1996         key.add_vpp_config()
1997         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1998                                             self.pg0.remote_ip4, sha1_key=key)
1999         self.vpp_session.add_vpp_config()
2000         self.test_session = BFDTestSession(
2001             self, self.pg0, AF_INET, sha1_key=key,
2002             bfd_key_id=self.vpp_session.bfd_key_id)
2003         bfd_session_up(self)
2004         detection_time = self.test_session.detect_mult *\
2005             self.vpp_session.required_min_rx / USEC_IN_SEC
2006         send_until = time.time() + 2 * detection_time
2007         while time.time() < send_until:
2008             self.test_session.send_packet()
2009             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
2010                        "time between bfd packets")
2011         e = self.vapi.collect_events()
2012         # session should be down now, because the sequence numbers weren't
2013         # updated
2014         self.assert_equal(len(e), 1, "number of bfd events")
2015         verify_event(self, e[0], expected_state=BFDState.down)
2016
2017     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
2018                                        legitimate_test_session,
2019                                        rogue_test_session,
2020                                        rogue_bfd_values=None):
2021         """ execute a rogue session interaction scenario
2022
2023         1. create vpp session, add config
2024         2. bring the legitimate session up
2025         3. copy the bfd values from legitimate session to rogue session
2026         4. apply rogue_bfd_values to rogue session
2027         5. set rogue session state to down
2028         6. send message to take the session down from the rogue session
2029         7. assert that the legitimate session is unaffected
2030         """
2031
2032         self.vpp_session = vpp_bfd_udp_session
2033         self.vpp_session.add_vpp_config()
2034         self.test_session = legitimate_test_session
2035         # bring vpp session up
2036         bfd_session_up(self)
2037         # send packet from rogue session
2038         rogue_test_session.update(
2039             my_discriminator=self.test_session.my_discriminator,
2040             your_discriminator=self.test_session.your_discriminator,
2041             desired_min_tx=self.test_session.desired_min_tx,
2042             required_min_rx=self.test_session.required_min_rx,
2043             detect_mult=self.test_session.detect_mult,
2044             diag=self.test_session.diag,
2045             state=self.test_session.state,
2046             auth_type=self.test_session.auth_type)
2047         if rogue_bfd_values:
2048             rogue_test_session.update(**rogue_bfd_values)
2049         rogue_test_session.update(state=BFDState.down)
2050         rogue_test_session.send_packet()
2051         wait_for_bfd_packet(self)
2052         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2053
2054     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2055     def test_mismatch_auth(self):
2056         """ session is not brought down by unauthenticated msg """
2057         key = self.factory.create_random_key(self)
2058         key.add_vpp_config()
2059         vpp_session = VppBFDUDPSession(
2060             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2061         legitimate_test_session = BFDTestSession(
2062             self, self.pg0, AF_INET, sha1_key=key,
2063             bfd_key_id=vpp_session.bfd_key_id)
2064         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
2065         self.execute_rogue_session_scenario(vpp_session,
2066                                             legitimate_test_session,
2067                                             rogue_test_session)
2068
2069     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2070     def test_mismatch_bfd_key_id(self):
2071         """ session is not brought down by msg with non-existent key-id """
2072         key = self.factory.create_random_key(self)
2073         key.add_vpp_config()
2074         vpp_session = VppBFDUDPSession(
2075             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2076         # pick a different random bfd key id
2077         x = randint(0, 255)
2078         while x == vpp_session.bfd_key_id:
2079             x = randint(0, 255)
2080         legitimate_test_session = BFDTestSession(
2081             self, self.pg0, AF_INET, sha1_key=key,
2082             bfd_key_id=vpp_session.bfd_key_id)
2083         rogue_test_session = BFDTestSession(
2084             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
2085         self.execute_rogue_session_scenario(vpp_session,
2086                                             legitimate_test_session,
2087                                             rogue_test_session)
2088
2089     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2090     def test_mismatched_auth_type(self):
2091         """ session is not brought down by msg with wrong auth type """
2092         key = self.factory.create_random_key(self)
2093         key.add_vpp_config()
2094         vpp_session = VppBFDUDPSession(
2095             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2096         legitimate_test_session = BFDTestSession(
2097             self, self.pg0, AF_INET, sha1_key=key,
2098             bfd_key_id=vpp_session.bfd_key_id)
2099         rogue_test_session = BFDTestSession(
2100             self, self.pg0, AF_INET, sha1_key=key,
2101             bfd_key_id=vpp_session.bfd_key_id)
2102         self.execute_rogue_session_scenario(
2103             vpp_session, legitimate_test_session, rogue_test_session,
2104             {'auth_type': BFDAuthType.keyed_md5})
2105
2106     @unittest.skipUnless(running_extended_tests, "part of extended tests")
2107     def test_restart(self):
2108         """ simulate remote peer restart and resynchronization """
2109         key = self.factory.create_random_key(
2110             self, BFDAuthType.meticulous_keyed_sha1)
2111         key.add_vpp_config()
2112         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2113                                             self.pg0.remote_ip4, sha1_key=key)
2114         self.vpp_session.add_vpp_config()
2115         self.test_session = BFDTestSession(
2116             self, self.pg0, AF_INET, sha1_key=key,
2117             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
2118         bfd_session_up(self)
2119         # don't send any packets for 2*detection_time
2120         detection_time = self.test_session.detect_mult *\
2121             self.vpp_session.required_min_rx / USEC_IN_SEC
2122         self.sleep(2 * detection_time, "simulating peer restart")
2123         events = self.vapi.collect_events()
2124         self.assert_equal(len(events), 1, "number of bfd events")
2125         verify_event(self, events[0], expected_state=BFDState.down)
2126         self.test_session.update(state=BFDState.down)
2127         # reset sequence number
2128         self.test_session.our_seq_number = 0
2129         self.test_session.vpp_seq_number = None
2130         # now throw away any pending packets
2131         self.pg0.enable_capture()
2132         bfd_session_up(self)
2133
2134
2135 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2136 class BFDAuthOnOffTestCase(VppTestCase):
2137     """Bidirectional Forwarding Detection (BFD) (changing auth) """
2138
2139     pg0 = None
2140     vpp_session = None
2141     test_session = None
2142
2143     @classmethod
2144     def setUpClass(cls):
2145         super(BFDAuthOnOffTestCase, cls).setUpClass()
2146         cls.vapi.cli("set log class bfd level debug")
2147         try:
2148             cls.create_pg_interfaces([0])
2149             cls.pg0.config_ip4()
2150             cls.pg0.admin_up()
2151             cls.pg0.resolve_arp()
2152
2153         except Exception:
2154             super(BFDAuthOnOffTestCase, cls).tearDownClass()
2155             raise
2156
2157     @classmethod
2158     def tearDownClass(cls):
2159         super(BFDAuthOnOffTestCase, cls).tearDownClass()
2160
2161     def setUp(self):
2162         super(BFDAuthOnOffTestCase, self).setUp()
2163         self.factory = AuthKeyFactory()
2164         self.vapi.want_bfd_events()
2165         self.pg0.enable_capture()
2166
2167     def tearDown(self):
2168         if not self.vpp_dead:
2169             self.vapi.want_bfd_events(enable_disable=0)
2170         self.vapi.collect_events()  # clear the event queue
2171         super(BFDAuthOnOffTestCase, self).tearDown()
2172
2173     def test_auth_on_immediate(self):
2174         """ turn auth on without disturbing session state (immediate) """
2175         key = self.factory.create_random_key(self)
2176         key.add_vpp_config()
2177         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2178                                             self.pg0.remote_ip4)
2179         self.vpp_session.add_vpp_config()
2180         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2181         bfd_session_up(self)
2182         for dummy in range(self.test_session.detect_mult * 2):
2183             p = wait_for_bfd_packet(self)
2184             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2185             self.test_session.send_packet()
2186         self.vpp_session.activate_auth(key)
2187         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2188         self.test_session.sha1_key = key
2189         for dummy in range(self.test_session.detect_mult * 2):
2190             p = wait_for_bfd_packet(self)
2191             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2192             self.test_session.send_packet()
2193         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2194         self.assert_equal(len(self.vapi.collect_events()), 0,
2195                           "number of bfd events")
2196
2197     def test_auth_off_immediate(self):
2198         """ turn auth off without disturbing session state (immediate) """
2199         key = self.factory.create_random_key(self)
2200         key.add_vpp_config()
2201         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2202                                             self.pg0.remote_ip4, sha1_key=key)
2203         self.vpp_session.add_vpp_config()
2204         self.test_session = BFDTestSession(
2205             self, self.pg0, AF_INET, sha1_key=key,
2206             bfd_key_id=self.vpp_session.bfd_key_id)
2207         bfd_session_up(self)
2208         # self.vapi.want_bfd_events(enable_disable=0)
2209         for dummy in range(self.test_session.detect_mult * 2):
2210             p = wait_for_bfd_packet(self)
2211             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2212             self.test_session.inc_seq_num()
2213             self.test_session.send_packet()
2214         self.vpp_session.deactivate_auth()
2215         self.test_session.bfd_key_id = None
2216         self.test_session.sha1_key = None
2217         for dummy in range(self.test_session.detect_mult * 2):
2218             p = wait_for_bfd_packet(self)
2219             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2220             self.test_session.inc_seq_num()
2221             self.test_session.send_packet()
2222         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2223         self.assert_equal(len(self.vapi.collect_events()), 0,
2224                           "number of bfd events")
2225
2226     def test_auth_change_key_immediate(self):
2227         """ change auth key without disturbing session state (immediate) """
2228         key1 = self.factory.create_random_key(self)
2229         key1.add_vpp_config()
2230         key2 = self.factory.create_random_key(self)
2231         key2.add_vpp_config()
2232         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2233                                             self.pg0.remote_ip4, sha1_key=key1)
2234         self.vpp_session.add_vpp_config()
2235         self.test_session = BFDTestSession(
2236             self, self.pg0, AF_INET, sha1_key=key1,
2237             bfd_key_id=self.vpp_session.bfd_key_id)
2238         bfd_session_up(self)
2239         for dummy in range(self.test_session.detect_mult * 2):
2240             p = wait_for_bfd_packet(self)
2241             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2242             self.test_session.send_packet()
2243         self.vpp_session.activate_auth(key2)
2244         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2245         self.test_session.sha1_key = key2
2246         for dummy in range(self.test_session.detect_mult * 2):
2247             p = wait_for_bfd_packet(self)
2248             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2249             self.test_session.send_packet()
2250         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2251         self.assert_equal(len(self.vapi.collect_events()), 0,
2252                           "number of bfd events")
2253
2254     def test_auth_on_delayed(self):
2255         """ turn auth on without disturbing session state (delayed) """
2256         key = self.factory.create_random_key(self)
2257         key.add_vpp_config()
2258         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2259                                             self.pg0.remote_ip4)
2260         self.vpp_session.add_vpp_config()
2261         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2262         bfd_session_up(self)
2263         for dummy in range(self.test_session.detect_mult * 2):
2264             wait_for_bfd_packet(self)
2265             self.test_session.send_packet()
2266         self.vpp_session.activate_auth(key, delayed=True)
2267         for dummy in range(self.test_session.detect_mult * 2):
2268             p = wait_for_bfd_packet(self)
2269             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2270             self.test_session.send_packet()
2271         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2272         self.test_session.sha1_key = key
2273         self.test_session.send_packet()
2274         for dummy in range(self.test_session.detect_mult * 2):
2275             p = wait_for_bfd_packet(self)
2276             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2277             self.test_session.send_packet()
2278         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2279         self.assert_equal(len(self.vapi.collect_events()), 0,
2280                           "number of bfd events")
2281
2282     def test_auth_off_delayed(self):
2283         """ turn auth off without disturbing session state (delayed) """
2284         key = self.factory.create_random_key(self)
2285         key.add_vpp_config()
2286         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2287                                             self.pg0.remote_ip4, sha1_key=key)
2288         self.vpp_session.add_vpp_config()
2289         self.test_session = BFDTestSession(
2290             self, self.pg0, AF_INET, sha1_key=key,
2291             bfd_key_id=self.vpp_session.bfd_key_id)
2292         bfd_session_up(self)
2293         for dummy in range(self.test_session.detect_mult * 2):
2294             p = wait_for_bfd_packet(self)
2295             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2296             self.test_session.send_packet()
2297         self.vpp_session.deactivate_auth(delayed=True)
2298         for dummy in range(self.test_session.detect_mult * 2):
2299             p = wait_for_bfd_packet(self)
2300             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2301             self.test_session.send_packet()
2302         self.test_session.bfd_key_id = None
2303         self.test_session.sha1_key = None
2304         self.test_session.send_packet()
2305         for dummy in range(self.test_session.detect_mult * 2):
2306             p = wait_for_bfd_packet(self)
2307             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2308             self.test_session.send_packet()
2309         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2310         self.assert_equal(len(self.vapi.collect_events()), 0,
2311                           "number of bfd events")
2312
2313     def test_auth_change_key_delayed(self):
2314         """ change auth key without disturbing session state (delayed) """
2315         key1 = self.factory.create_random_key(self)
2316         key1.add_vpp_config()
2317         key2 = self.factory.create_random_key(self)
2318         key2.add_vpp_config()
2319         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2320                                             self.pg0.remote_ip4, sha1_key=key1)
2321         self.vpp_session.add_vpp_config()
2322         self.vpp_session.admin_up()
2323         self.test_session = BFDTestSession(
2324             self, self.pg0, AF_INET, sha1_key=key1,
2325             bfd_key_id=self.vpp_session.bfd_key_id)
2326         bfd_session_up(self)
2327         for dummy in range(self.test_session.detect_mult * 2):
2328             p = wait_for_bfd_packet(self)
2329             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2330             self.test_session.send_packet()
2331         self.vpp_session.activate_auth(key2, delayed=True)
2332         for dummy in range(self.test_session.detect_mult * 2):
2333             p = wait_for_bfd_packet(self)
2334             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2335             self.test_session.send_packet()
2336         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2337         self.test_session.sha1_key = key2
2338         self.test_session.send_packet()
2339         for dummy in range(self.test_session.detect_mult * 2):
2340             p = wait_for_bfd_packet(self)
2341             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2342             self.test_session.send_packet()
2343         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2344         self.assert_equal(len(self.vapi.collect_events()), 0,
2345                           "number of bfd events")
2346
2347
2348 @unittest.skipUnless(running_extended_tests, "part of extended tests")
2349 class BFDCLITestCase(VppTestCase):
2350     """Bidirectional Forwarding Detection (BFD) (CLI) """
2351     pg0 = None
2352
2353     @classmethod
2354     def setUpClass(cls):
2355         super(BFDCLITestCase, cls).setUpClass()
2356         cls.vapi.cli("set log class bfd level debug")
2357         try:
2358             cls.create_pg_interfaces((0,))
2359             cls.pg0.config_ip4()
2360             cls.pg0.config_ip6()
2361             cls.pg0.resolve_arp()
2362             cls.pg0.resolve_ndp()
2363
2364         except Exception:
2365             super(BFDCLITestCase, cls).tearDownClass()
2366             raise
2367
2368     @classmethod
2369     def tearDownClass(cls):
2370         super(BFDCLITestCase, cls).tearDownClass()
2371
2372     def setUp(self):
2373         super(BFDCLITestCase, self).setUp()
2374         self.factory = AuthKeyFactory()
2375         self.pg0.enable_capture()
2376
2377     def tearDown(self):
2378         try:
2379             self.vapi.want_bfd_events(enable_disable=0)
2380         except UnexpectedApiReturnValueError:
2381             # some tests aren't subscribed, so this is not an issue
2382             pass
2383         self.vapi.collect_events()  # clear the event queue
2384         super(BFDCLITestCase, self).tearDown()
2385
2386     def cli_verify_no_response(self, cli):
2387         """ execute a CLI, asserting that the response is empty """
2388         self.assert_equal(self.vapi.cli(cli),
2389                           b"",
2390                           "CLI command response")
2391
2392     def cli_verify_response(self, cli, expected):
2393         """ execute a CLI, asserting that the response matches expectation """
2394         self.assert_equal(self.vapi.cli(cli).strip(),
2395                           expected,
2396                           "CLI command response")
2397
2398     def test_show(self):
2399         """ show commands """
2400         k1 = self.factory.create_random_key(self)
2401         k1.add_vpp_config()
2402         k2 = self.factory.create_random_key(
2403             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2404         k2.add_vpp_config()
2405         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2406         s1.add_vpp_config()
2407         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2408                               sha1_key=k2)
2409         s2.add_vpp_config()
2410         self.logger.info(self.vapi.ppcli("show bfd keys"))
2411         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2412         self.logger.info(self.vapi.ppcli("show bfd"))
2413
2414     def test_set_del_sha1_key(self):
2415         """ set/delete SHA1 auth key """
2416         k = self.factory.create_random_key(self)
2417         self.registry.register(k, self.logger)
2418         self.cli_verify_no_response(
2419             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2420             (k.conf_key_id,
2421                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2422         self.assertTrue(k.query_vpp_config())
2423         self.vpp_session = VppBFDUDPSession(
2424             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2425         self.vpp_session.add_vpp_config()
2426         self.test_session = \
2427             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2428                            bfd_key_id=self.vpp_session.bfd_key_id)
2429         self.vapi.want_bfd_events()
2430         bfd_session_up(self)
2431         bfd_session_down(self)
2432         # try to replace the secret for the key - should fail because the key
2433         # is in-use
2434         k2 = self.factory.create_random_key(self)
2435         self.cli_verify_response(
2436             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2437             (k.conf_key_id,
2438                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2439             "bfd key set: `bfd_auth_set_key' API call failed, "
2440             "rv=-103:BFD object in use")
2441         # manipulating the session using old secret should still work
2442         bfd_session_up(self)
2443         bfd_session_down(self)
2444         self.vpp_session.remove_vpp_config()
2445         self.cli_verify_no_response(
2446             "bfd key del conf-key-id %s" % k.conf_key_id)
2447         self.assertFalse(k.query_vpp_config())
2448
2449     def test_set_del_meticulous_sha1_key(self):
2450         """ set/delete meticulous SHA1 auth key """
2451         k = self.factory.create_random_key(
2452             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2453         self.registry.register(k, self.logger)
2454         self.cli_verify_no_response(
2455             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2456             (k.conf_key_id,
2457                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
2458         self.assertTrue(k.query_vpp_config())
2459         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2460                                             self.pg0.remote_ip6, af=AF_INET6,
2461                                             sha1_key=k)
2462         self.vpp_session.add_vpp_config()
2463         self.vpp_session.admin_up()
2464         self.test_session = \
2465             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2466                            bfd_key_id=self.vpp_session.bfd_key_id)
2467         self.vapi.want_bfd_events()
2468         bfd_session_up(self)
2469         bfd_session_down(self)
2470         # try to replace the secret for the key - should fail because the key
2471         # is in-use
2472         k2 = self.factory.create_random_key(self)
2473         self.cli_verify_response(
2474             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2475             (k.conf_key_id,
2476                 "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
2477             "bfd key set: `bfd_auth_set_key' API call failed, "
2478             "rv=-103:BFD object in use")
2479         # manipulating the session using old secret should still work
2480         bfd_session_up(self)
2481         bfd_session_down(self)
2482         self.vpp_session.remove_vpp_config()
2483         self.cli_verify_no_response(
2484             "bfd key del conf-key-id %s" % k.conf_key_id)
2485         self.assertFalse(k.query_vpp_config())
2486
2487     def test_add_mod_del_bfd_udp(self):
2488         """ create/modify/delete IPv4 BFD UDP session """
2489         vpp_session = VppBFDUDPSession(
2490             self, self.pg0, self.pg0.remote_ip4)
2491         self.registry.register(vpp_session, self.logger)
2492         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2493             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2494             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2495                                 self.pg0.remote_ip4,
2496                                 vpp_session.desired_min_tx,
2497                                 vpp_session.required_min_rx,
2498                                 vpp_session.detect_mult)
2499         self.cli_verify_no_response(cli_add_cmd)
2500         # 2nd add should fail
2501         self.cli_verify_response(
2502             cli_add_cmd,
2503             "bfd udp session add: `bfd_add_add_session' API call"
2504             " failed, rv=-101:Duplicate BFD object")
2505         verify_bfd_session_config(self, vpp_session)
2506         mod_session = VppBFDUDPSession(
2507             self, self.pg0, self.pg0.remote_ip4,
2508             required_min_rx=2 * vpp_session.required_min_rx,
2509             desired_min_tx=3 * vpp_session.desired_min_tx,
2510             detect_mult=4 * vpp_session.detect_mult)
2511         self.cli_verify_no_response(
2512             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2513             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2514             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2515              mod_session.desired_min_tx, mod_session.required_min_rx,
2516              mod_session.detect_mult))
2517         verify_bfd_session_config(self, mod_session)
2518         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2519             "peer-addr %s" % (self.pg0.name,
2520                               self.pg0.local_ip4, self.pg0.remote_ip4)
2521         self.cli_verify_no_response(cli_del_cmd)
2522         # 2nd del is expected to fail
2523         self.cli_verify_response(
2524             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2525             " failed, rv=-102:No such BFD object")
2526         self.assertFalse(vpp_session.query_vpp_config())
2527
2528     def test_add_mod_del_bfd_udp6(self):
2529         """ create/modify/delete IPv6 BFD UDP session """
2530         vpp_session = VppBFDUDPSession(
2531             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2532         self.registry.register(vpp_session, self.logger)
2533         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2534             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2535             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2536                                 self.pg0.remote_ip6,
2537                                 vpp_session.desired_min_tx,
2538                                 vpp_session.required_min_rx,
2539                                 vpp_session.detect_mult)
2540         self.cli_verify_no_response(cli_add_cmd)
2541         # 2nd add should fail
2542         self.cli_verify_response(
2543             cli_add_cmd,
2544             "bfd udp session add: `bfd_add_add_session' API call"
2545             " failed, rv=-101:Duplicate BFD object")
2546         verify_bfd_session_config(self, vpp_session)
2547         mod_session = VppBFDUDPSession(
2548             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2549             required_min_rx=2 * vpp_session.required_min_rx,
2550             desired_min_tx=3 * vpp_session.desired_min_tx,
2551             detect_mult=4 * vpp_session.detect_mult)
2552         self.cli_verify_no_response(
2553             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2554             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2555             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2556              mod_session.desired_min_tx,
2557              mod_session.required_min_rx, mod_session.detect_mult))
2558         verify_bfd_session_config(self, mod_session)
2559         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2560             "peer-addr %s" % (self.pg0.name,
2561                               self.pg0.local_ip6, self.pg0.remote_ip6)
2562         self.cli_verify_no_response(cli_del_cmd)
2563         # 2nd del is expected to fail
2564         self.cli_verify_response(
2565             cli_del_cmd,
2566             "bfd udp session del: `bfd_udp_del_session' API call"
2567             " failed, rv=-102:No such BFD object")
2568         self.assertFalse(vpp_session.query_vpp_config())
2569
2570     def test_add_mod_del_bfd_udp_auth(self):
2571         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2572         key = self.factory.create_random_key(self)
2573         key.add_vpp_config()
2574         vpp_session = VppBFDUDPSession(
2575             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2576         self.registry.register(vpp_session, self.logger)
2577         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2578             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2579             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2580             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2581                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2582                vpp_session.detect_mult, key.conf_key_id,
2583                vpp_session.bfd_key_id)
2584         self.cli_verify_no_response(cli_add_cmd)
2585         # 2nd add should fail
2586         self.cli_verify_response(
2587             cli_add_cmd,
2588             "bfd udp session add: `bfd_add_add_session' API call"
2589             " failed, rv=-101:Duplicate BFD object")
2590         verify_bfd_session_config(self, vpp_session)
2591         mod_session = VppBFDUDPSession(
2592             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2593             bfd_key_id=vpp_session.bfd_key_id,
2594             required_min_rx=2 * vpp_session.required_min_rx,
2595             desired_min_tx=3 * vpp_session.desired_min_tx,
2596             detect_mult=4 * vpp_session.detect_mult)
2597         self.cli_verify_no_response(
2598             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2599             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2600             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2601              mod_session.desired_min_tx,
2602              mod_session.required_min_rx, mod_session.detect_mult))
2603         verify_bfd_session_config(self, mod_session)
2604         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2605             "peer-addr %s" % (self.pg0.name,
2606                               self.pg0.local_ip4, self.pg0.remote_ip4)
2607         self.cli_verify_no_response(cli_del_cmd)
2608         # 2nd del is expected to fail
2609         self.cli_verify_response(
2610             cli_del_cmd,
2611             "bfd udp session del: `bfd_udp_del_session' API call"
2612             " failed, rv=-102:No such BFD object")
2613         self.assertFalse(vpp_session.query_vpp_config())
2614
2615     def test_add_mod_del_bfd_udp6_auth(self):
2616         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2617         key = self.factory.create_random_key(
2618             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2619         key.add_vpp_config()
2620         vpp_session = VppBFDUDPSession(
2621             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2622         self.registry.register(vpp_session, self.logger)
2623         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2624             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2625             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2626             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2627                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2628                vpp_session.detect_mult, key.conf_key_id,
2629                vpp_session.bfd_key_id)
2630         self.cli_verify_no_response(cli_add_cmd)
2631         # 2nd add should fail
2632         self.cli_verify_response(
2633             cli_add_cmd,
2634             "bfd udp session add: `bfd_add_add_session' API call"
2635             " failed, rv=-101:Duplicate BFD object")
2636         verify_bfd_session_config(self, vpp_session)
2637         mod_session = VppBFDUDPSession(
2638             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2639             bfd_key_id=vpp_session.bfd_key_id,
2640             required_min_rx=2 * vpp_session.required_min_rx,
2641             desired_min_tx=3 * vpp_session.desired_min_tx,
2642             detect_mult=4 * vpp_session.detect_mult)
2643         self.cli_verify_no_response(
2644             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2645             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2646             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2647              mod_session.desired_min_tx,
2648              mod_session.required_min_rx, mod_session.detect_mult))
2649         verify_bfd_session_config(self, mod_session)
2650         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2651             "peer-addr %s" % (self.pg0.name,
2652                               self.pg0.local_ip6, self.pg0.remote_ip6)
2653         self.cli_verify_no_response(cli_del_cmd)
2654         # 2nd del is expected to fail
2655         self.cli_verify_response(
2656             cli_del_cmd,
2657             "bfd udp session del: `bfd_udp_del_session' API call"
2658             " failed, rv=-102:No such BFD object")
2659         self.assertFalse(vpp_session.query_vpp_config())
2660
2661     def test_auth_on_off(self):
2662         """ turn authentication on and off """
2663         key = self.factory.create_random_key(
2664             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2665         key.add_vpp_config()
2666         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2667         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2668                                         sha1_key=key)
2669         session.add_vpp_config()
2670         cli_activate = \
2671             "bfd udp session auth activate interface %s local-addr %s "\
2672             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2673             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2674                key.conf_key_id, auth_session.bfd_key_id)
2675         self.cli_verify_no_response(cli_activate)
2676         verify_bfd_session_config(self, auth_session)
2677         self.cli_verify_no_response(cli_activate)
2678         verify_bfd_session_config(self, auth_session)
2679         cli_deactivate = \
2680             "bfd udp session auth deactivate interface %s local-addr %s "\
2681             "peer-addr %s "\
2682             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2683         self.cli_verify_no_response(cli_deactivate)
2684         verify_bfd_session_config(self, session)
2685         self.cli_verify_no_response(cli_deactivate)
2686         verify_bfd_session_config(self, session)
2687
2688     def test_auth_on_off_delayed(self):
2689         """ turn authentication on and off (delayed) """
2690         key = self.factory.create_random_key(
2691             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2692         key.add_vpp_config()
2693         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2694         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2695                                         sha1_key=key)
2696         session.add_vpp_config()
2697         cli_activate = \
2698             "bfd udp session auth activate interface %s local-addr %s "\
2699             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2700             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2701                key.conf_key_id, auth_session.bfd_key_id)
2702         self.cli_verify_no_response(cli_activate)
2703         verify_bfd_session_config(self, auth_session)
2704         self.cli_verify_no_response(cli_activate)
2705         verify_bfd_session_config(self, auth_session)
2706         cli_deactivate = \
2707             "bfd udp session auth deactivate interface %s local-addr %s "\
2708             "peer-addr %s delayed yes"\
2709             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2710         self.cli_verify_no_response(cli_deactivate)
2711         verify_bfd_session_config(self, session)
2712         self.cli_verify_no_response(cli_deactivate)
2713         verify_bfd_session_config(self, session)
2714
2715     def test_admin_up_down(self):
2716         """ put session admin-up and admin-down """
2717         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2718         session.add_vpp_config()
2719         cli_down = \
2720             "bfd udp session set-flags admin down interface %s local-addr %s "\
2721             "peer-addr %s "\
2722             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2723         cli_up = \
2724             "bfd udp session set-flags admin up interface %s local-addr %s "\
2725             "peer-addr %s "\
2726             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2727         self.cli_verify_no_response(cli_down)
2728         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2729         self.cli_verify_no_response(cli_up)
2730         verify_bfd_session_config(self, session, state=BFDState.down)
2731
2732     def test_set_del_udp_echo_source(self):
2733         """ set/del udp echo source """
2734         self.create_loopback_interfaces(1)
2735         self.loopback0 = self.lo_interfaces[0]
2736         self.loopback0.admin_up()
2737         self.cli_verify_response("show bfd echo-source",
2738                                  "UDP echo source is not set.")
2739         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2740         self.cli_verify_no_response(cli_set)
2741         self.cli_verify_response("show bfd echo-source",
2742                                  "UDP echo source is: %s\n"
2743                                  "IPv4 address usable as echo source: none\n"
2744                                  "IPv6 address usable as echo source: none" %
2745                                  self.loopback0.name)
2746         self.loopback0.config_ip4()
2747         unpacked = unpack("!L", self.loopback0.local_ip4n)
2748         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2749         self.cli_verify_response("show bfd echo-source",
2750                                  "UDP echo source is: %s\n"
2751                                  "IPv4 address usable as echo source: %s\n"
2752                                  "IPv6 address usable as echo source: none" %
2753                                  (self.loopback0.name, echo_ip4))
2754         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2755         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2756                                             unpacked[2], unpacked[3] ^ 1))
2757         self.loopback0.config_ip6()
2758         self.cli_verify_response("show bfd echo-source",
2759                                  "UDP echo source is: %s\n"
2760                                  "IPv4 address usable as echo source: %s\n"
2761                                  "IPv6 address usable as echo source: %s" %
2762                                  (self.loopback0.name, echo_ip4, echo_ip6))
2763         cli_del = "bfd udp echo-source del"
2764         self.cli_verify_no_response(cli_del)
2765         self.cli_verify_response("show bfd echo-source",
2766                                  "UDP echo source is not set.")
2767
2768 if __name__ == '__main__':
2769     unittest.main(testRunner=VppTestRunner)