make test: improve bfd reliability
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
20 from util import ppp
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22 from vpp_ip_route import VppIpRoute, VppRoutePath
23
24 USEC_IN_SEC = 1000000
25
26
27 class AuthKeyFactory(object):
28     """Factory class for creating auth keys with unique conf key ID"""
29
30     def __init__(self):
31         self._conf_key_ids = {}
32
33     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
34         """ create a random key with unique conf key id """
35         conf_key_id = randint(0, 0xFFFFFFFF)
36         while conf_key_id in self._conf_key_ids:
37             conf_key_id = randint(0, 0xFFFFFFFF)
38         self._conf_key_ids[conf_key_id] = 1
39         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
40         return VppBFDAuthKey(test=test, auth_type=auth_type,
41                              conf_key_id=conf_key_id, key=key)
42
43
44 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
45 class BFDAPITestCase(VppTestCase):
46     """Bidirectional Forwarding Detection (BFD) - API"""
47
48     pg0 = None
49     pg1 = None
50
51     @classmethod
52     def setUpClass(cls):
53         super(BFDAPITestCase, cls).setUpClass()
54
55         try:
56             cls.create_pg_interfaces(range(2))
57             for i in cls.pg_interfaces:
58                 i.config_ip4()
59                 i.config_ip6()
60                 i.resolve_arp()
61
62         except Exception:
63             super(BFDAPITestCase, cls).tearDownClass()
64             raise
65
66     def setUp(self):
67         super(BFDAPITestCase, self).setUp()
68         self.factory = AuthKeyFactory()
69
70     def test_add_bfd(self):
71         """ create a BFD session """
72         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
73         session.add_vpp_config()
74         self.logger.debug("Session state is %s", session.state)
75         session.remove_vpp_config()
76         session.add_vpp_config()
77         self.logger.debug("Session state is %s", session.state)
78         session.remove_vpp_config()
79
80     def test_double_add(self):
81         """ create the same BFD session twice (negative case) """
82         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
83         session.add_vpp_config()
84
85         with self.vapi.expect_negative_api_retval():
86             session.add_vpp_config()
87
88         session.remove_vpp_config()
89
90     def test_add_bfd6(self):
91         """ create IPv6 BFD session """
92         session = VppBFDUDPSession(
93             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
94         session.add_vpp_config()
95         self.logger.debug("Session state is %s", session.state)
96         session.remove_vpp_config()
97         session.add_vpp_config()
98         self.logger.debug("Session state is %s", session.state)
99         session.remove_vpp_config()
100
101     def test_mod_bfd(self):
102         """ modify BFD session parameters """
103         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
104                                    desired_min_tx=50000,
105                                    required_min_rx=10000,
106                                    detect_mult=1)
107         session.add_vpp_config()
108         s = session.get_bfd_udp_session_dump_entry()
109         self.assert_equal(session.desired_min_tx,
110                           s.desired_min_tx,
111                           "desired min transmit interval")
112         self.assert_equal(session.required_min_rx,
113                           s.required_min_rx,
114                           "required min receive interval")
115         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
116         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
117                                   required_min_rx=session.required_min_rx * 2,
118                                   detect_mult=session.detect_mult * 2)
119         s = session.get_bfd_udp_session_dump_entry()
120         self.assert_equal(session.desired_min_tx,
121                           s.desired_min_tx,
122                           "desired min transmit interval")
123         self.assert_equal(session.required_min_rx,
124                           s.required_min_rx,
125                           "required min receive interval")
126         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
127
128     def test_add_sha1_keys(self):
129         """ add SHA1 keys """
130         key_count = 10
131         keys = [self.factory.create_random_key(
132             self) for i in range(0, key_count)]
133         for key in keys:
134             self.assertFalse(key.query_vpp_config())
135         for key in keys:
136             key.add_vpp_config()
137         for key in keys:
138             self.assertTrue(key.query_vpp_config())
139         # remove randomly
140         indexes = range(key_count)
141         shuffle(indexes)
142         removed = []
143         for i in indexes:
144             key = keys[i]
145             key.remove_vpp_config()
146             removed.append(i)
147             for j in range(key_count):
148                 key = keys[j]
149                 if j in removed:
150                     self.assertFalse(key.query_vpp_config())
151                 else:
152                     self.assertTrue(key.query_vpp_config())
153         # should be removed now
154         for key in keys:
155             self.assertFalse(key.query_vpp_config())
156         # add back and remove again
157         for key in keys:
158             key.add_vpp_config()
159         for key in keys:
160             self.assertTrue(key.query_vpp_config())
161         for key in keys:
162             key.remove_vpp_config()
163         for key in keys:
164             self.assertFalse(key.query_vpp_config())
165
166     def test_add_bfd_sha1(self):
167         """ create a BFD session (SHA1) """
168         key = self.factory.create_random_key(self)
169         key.add_vpp_config()
170         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
171                                    sha1_key=key)
172         session.add_vpp_config()
173         self.logger.debug("Session state is %s", session.state)
174         session.remove_vpp_config()
175         session.add_vpp_config()
176         self.logger.debug("Session state is %s", session.state)
177         session.remove_vpp_config()
178
179     def test_double_add_sha1(self):
180         """ create the same BFD session twice (negative case) (SHA1) """
181         key = self.factory.create_random_key(self)
182         key.add_vpp_config()
183         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
184                                    sha1_key=key)
185         session.add_vpp_config()
186         with self.assertRaises(Exception):
187             session.add_vpp_config()
188
189     def test_add_auth_nonexistent_key(self):
190         """ create BFD session using non-existent SHA1 (negative case) """
191         session = VppBFDUDPSession(
192             self, self.pg0, self.pg0.remote_ip4,
193             sha1_key=self.factory.create_random_key(self))
194         with self.assertRaises(Exception):
195             session.add_vpp_config()
196
197     def test_shared_sha1_key(self):
198         """ share single SHA1 key between multiple BFD sessions """
199         key = self.factory.create_random_key(self)
200         key.add_vpp_config()
201         sessions = [
202             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
203                              sha1_key=key),
204             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
205                              sha1_key=key, af=AF_INET6),
206             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
207                              sha1_key=key),
208             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
209                              sha1_key=key, af=AF_INET6)]
210         for s in sessions:
211             s.add_vpp_config()
212         removed = 0
213         for s in sessions:
214             e = key.get_bfd_auth_keys_dump_entry()
215             self.assert_equal(e.use_count, len(sessions) - removed,
216                               "Use count for shared key")
217             s.remove_vpp_config()
218             removed += 1
219         e = key.get_bfd_auth_keys_dump_entry()
220         self.assert_equal(e.use_count, len(sessions) - removed,
221                           "Use count for shared key")
222
223     def test_activate_auth(self):
224         """ activate SHA1 authentication """
225         key = self.factory.create_random_key(self)
226         key.add_vpp_config()
227         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
228         session.add_vpp_config()
229         session.activate_auth(key)
230
231     def test_deactivate_auth(self):
232         """ deactivate SHA1 authentication """
233         key = self.factory.create_random_key(self)
234         key.add_vpp_config()
235         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
236         session.add_vpp_config()
237         session.activate_auth(key)
238         session.deactivate_auth()
239
240     def test_change_key(self):
241         """ change SHA1 key """
242         key1 = self.factory.create_random_key(self)
243         key2 = self.factory.create_random_key(self)
244         while key2.conf_key_id == key1.conf_key_id:
245             key2 = self.factory.create_random_key(self)
246         key1.add_vpp_config()
247         key2.add_vpp_config()
248         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
249                                    sha1_key=key1)
250         session.add_vpp_config()
251         session.activate_auth(key2)
252
253
254 class BFDTestSession(object):
255     """ BFD session as seen from test framework side """
256
257     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
258                  bfd_key_id=None, our_seq_number=None):
259         self.test = test
260         self.af = af
261         self.sha1_key = sha1_key
262         self.bfd_key_id = bfd_key_id
263         self.interface = interface
264         self.udp_sport = randint(49152, 65535)
265         if our_seq_number is None:
266             self.our_seq_number = randint(0, 40000000)
267         else:
268             self.our_seq_number = our_seq_number
269         self.vpp_seq_number = None
270         self.my_discriminator = 0
271         self.desired_min_tx = 300000
272         self.required_min_rx = 300000
273         self.required_min_echo_rx = None
274         self.detect_mult = detect_mult
275         self.diag = BFDDiagCode.no_diagnostic
276         self.your_discriminator = None
277         self.state = BFDState.down
278         self.auth_type = BFDAuthType.no_auth
279
280     def inc_seq_num(self):
281         """ increment sequence number, wrapping if needed """
282         if self.our_seq_number == 0xFFFFFFFF:
283             self.our_seq_number = 0
284         else:
285             self.our_seq_number += 1
286
287     def update(self, my_discriminator=None, your_discriminator=None,
288                desired_min_tx=None, required_min_rx=None,
289                required_min_echo_rx=None, detect_mult=None,
290                diag=None, state=None, auth_type=None):
291         """ update BFD parameters associated with session """
292         if my_discriminator is not None:
293             self.my_discriminator = my_discriminator
294         if your_discriminator is not None:
295             self.your_discriminator = your_discriminator
296         if required_min_rx is not None:
297             self.required_min_rx = required_min_rx
298         if required_min_echo_rx is not None:
299             self.required_min_echo_rx = required_min_echo_rx
300         if desired_min_tx is not None:
301             self.desired_min_tx = desired_min_tx
302         if detect_mult is not None:
303             self.detect_mult = detect_mult
304         if diag is not None:
305             self.diag = diag
306         if state is not None:
307             self.state = state
308         if auth_type is not None:
309             self.auth_type = auth_type
310
311     def fill_packet_fields(self, packet):
312         """ set packet fields with known values in packet """
313         bfd = packet[BFD]
314         if self.my_discriminator:
315             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
316                                    self.my_discriminator)
317             bfd.my_discriminator = self.my_discriminator
318         if self.your_discriminator:
319             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
320                                    self.your_discriminator)
321             bfd.your_discriminator = self.your_discriminator
322         if self.required_min_rx:
323             self.test.logger.debug(
324                 "BFD: setting packet.required_min_rx_interval=%s",
325                 self.required_min_rx)
326             bfd.required_min_rx_interval = self.required_min_rx
327         if self.required_min_echo_rx:
328             self.test.logger.debug(
329                 "BFD: setting packet.required_min_echo_rx=%s",
330                 self.required_min_echo_rx)
331             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
332         if self.desired_min_tx:
333             self.test.logger.debug(
334                 "BFD: setting packet.desired_min_tx_interval=%s",
335                 self.desired_min_tx)
336             bfd.desired_min_tx_interval = self.desired_min_tx
337         if self.detect_mult:
338             self.test.logger.debug(
339                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
340             bfd.detect_mult = self.detect_mult
341         if self.diag:
342             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
343             bfd.diag = self.diag
344         if self.state:
345             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
346             bfd.state = self.state
347         if self.auth_type:
348             # this is used by a negative test-case
349             self.test.logger.debug("BFD: setting packet.auth_type=%s",
350                                    self.auth_type)
351             bfd.auth_type = self.auth_type
352
353     def create_packet(self):
354         """ create a BFD packet, reflecting the current state of session """
355         if self.sha1_key:
356             bfd = BFD(flags="A")
357             bfd.auth_type = self.sha1_key.auth_type
358             bfd.auth_len = BFD.sha1_auth_len
359             bfd.auth_key_id = self.bfd_key_id
360             bfd.auth_seq_num = self.our_seq_number
361             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
362         else:
363             bfd = BFD()
364         if self.af == AF_INET6:
365             packet = (Ether(src=self.interface.remote_mac,
366                             dst=self.interface.local_mac) /
367                       IPv6(src=self.interface.remote_ip6,
368                            dst=self.interface.local_ip6,
369                            hlim=255) /
370                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
371                       bfd)
372         else:
373             packet = (Ether(src=self.interface.remote_mac,
374                             dst=self.interface.local_mac) /
375                       IP(src=self.interface.remote_ip4,
376                          dst=self.interface.local_ip4,
377                          ttl=255) /
378                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
379                       bfd)
380         self.test.logger.debug("BFD: Creating packet")
381         self.fill_packet_fields(packet)
382         if self.sha1_key:
383             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
384                 "\0" * (20 - len(self.sha1_key.key))
385             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
386                                    hashlib.sha1(hash_material).hexdigest())
387             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
388         return packet
389
390     def send_packet(self, packet=None, interface=None):
391         """ send packet on interface, creating the packet if needed """
392         if packet is None:
393             packet = self.create_packet()
394         if interface is None:
395             interface = self.test.pg0
396         self.test.logger.debug(ppp("Sending packet:", packet))
397         interface.add_stream(packet)
398         self.test.pg_start()
399
400     def verify_sha1_auth(self, packet):
401         """ Verify correctness of authentication in BFD layer. """
402         bfd = packet[BFD]
403         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
404         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
405                                BFDAuthType)
406         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
407         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
408         if self.vpp_seq_number is None:
409             self.vpp_seq_number = bfd.auth_seq_num
410             self.test.logger.debug("Received initial sequence number: %s" %
411                                    self.vpp_seq_number)
412         else:
413             recvd_seq_num = bfd.auth_seq_num
414             self.test.logger.debug("Received followup sequence number: %s" %
415                                    recvd_seq_num)
416             if self.vpp_seq_number < 0xffffffff:
417                 if self.sha1_key.auth_type == \
418                         BFDAuthType.meticulous_keyed_sha1:
419                     self.test.assert_equal(recvd_seq_num,
420                                            self.vpp_seq_number + 1,
421                                            "BFD sequence number")
422                 else:
423                     self.test.assert_in_range(recvd_seq_num,
424                                               self.vpp_seq_number,
425                                               self.vpp_seq_number + 1,
426                                               "BFD sequence number")
427             else:
428                 if self.sha1_key.auth_type == \
429                         BFDAuthType.meticulous_keyed_sha1:
430                     self.test.assert_equal(recvd_seq_num, 0,
431                                            "BFD sequence number")
432                 else:
433                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
434                                        "BFD sequence number not one of "
435                                        "(%s, 0)" % self.vpp_seq_number)
436             self.vpp_seq_number = recvd_seq_num
437         # last 20 bytes represent the hash - so replace them with the key,
438         # pad the result with zeros and hash the result
439         hash_material = bfd.original[:-20] + self.sha1_key.key + \
440             "\0" * (20 - len(self.sha1_key.key))
441         expected_hash = hashlib.sha1(hash_material).hexdigest()
442         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
443                                expected_hash, "Auth key hash")
444
445     def verify_bfd(self, packet):
446         """ Verify correctness of BFD layer. """
447         bfd = packet[BFD]
448         self.test.assert_equal(bfd.version, 1, "BFD version")
449         self.test.assert_equal(bfd.your_discriminator,
450                                self.my_discriminator,
451                                "BFD - your discriminator")
452         if self.sha1_key:
453             self.verify_sha1_auth(packet)
454
455
456 def bfd_session_up(test):
457     """ Bring BFD session up """
458     test.logger.info("BFD: Waiting for slow hello")
459     p = wait_for_bfd_packet(test, 2)
460     old_offset = None
461     if hasattr(test, 'vpp_clock_offset'):
462         old_offset = test.vpp_clock_offset
463     test.vpp_clock_offset = time.time() - p.time
464     test.logger.debug("BFD: Calculated vpp clock offset: %s",
465                       test.vpp_clock_offset)
466     if old_offset:
467         test.assertAlmostEqual(
468             old_offset, test.vpp_clock_offset, delta=0.5,
469             msg="vpp clock offset not stable (new: %s, old: %s)" %
470             (test.vpp_clock_offset, old_offset))
471     test.logger.info("BFD: Sending Init")
472     test.test_session.update(my_discriminator=randint(0, 40000000),
473                              your_discriminator=p[BFD].my_discriminator,
474                              state=BFDState.init)
475     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
476             BFDAuthType.meticulous_keyed_sha1:
477         test.test_session.inc_seq_num()
478     test.test_session.send_packet()
479     test.logger.info("BFD: Waiting for event")
480     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
481     verify_event(test, e, expected_state=BFDState.up)
482     test.logger.info("BFD: Session is Up")
483     test.test_session.update(state=BFDState.up)
484     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
485             BFDAuthType.meticulous_keyed_sha1:
486         test.test_session.inc_seq_num()
487     test.test_session.send_packet()
488     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
489
490
491 def bfd_session_down(test):
492     """ Bring BFD session down """
493     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
494     test.test_session.update(state=BFDState.down)
495     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
496             BFDAuthType.meticulous_keyed_sha1:
497         test.test_session.inc_seq_num()
498     test.test_session.send_packet()
499     test.logger.info("BFD: Waiting for event")
500     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
501     verify_event(test, e, expected_state=BFDState.down)
502     test.logger.info("BFD: Session is Down")
503     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
504
505
506 def verify_bfd_session_config(test, session, state=None):
507     dump = session.get_bfd_udp_session_dump_entry()
508     test.assertIsNotNone(dump)
509     # since dump is not none, we have verified that sw_if_index and addresses
510     # are valid (in get_bfd_udp_session_dump_entry)
511     if state:
512         test.assert_equal(dump.state, state, "session state")
513     test.assert_equal(dump.required_min_rx, session.required_min_rx,
514                       "required min rx interval")
515     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
516                       "desired min tx interval")
517     test.assert_equal(dump.detect_mult, session.detect_mult,
518                       "detect multiplier")
519     if session.sha1_key is None:
520         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
521     else:
522         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
523         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
524                           "bfd key id")
525         test.assert_equal(dump.conf_key_id,
526                           session.sha1_key.conf_key_id,
527                           "config key id")
528
529
530 def verify_ip(test, packet):
531     """ Verify correctness of IP layer. """
532     if test.vpp_session.af == AF_INET6:
533         ip = packet[IPv6]
534         local_ip = test.pg0.local_ip6
535         remote_ip = test.pg0.remote_ip6
536         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
537     else:
538         ip = packet[IP]
539         local_ip = test.pg0.local_ip4
540         remote_ip = test.pg0.remote_ip4
541         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
542     test.assert_equal(ip.src, local_ip, "IP source address")
543     test.assert_equal(ip.dst, remote_ip, "IP destination address")
544
545
546 def verify_udp(test, packet):
547     """ Verify correctness of UDP layer. """
548     udp = packet[UDP]
549     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
550     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
551                          "UDP source port")
552
553
554 def verify_event(test, event, expected_state):
555     """ Verify correctness of event values. """
556     e = event
557     test.logger.debug("BFD: Event: %s" % repr(e))
558     test.assert_equal(e.sw_if_index,
559                       test.vpp_session.interface.sw_if_index,
560                       "BFD interface index")
561     is_ipv6 = 0
562     if test.vpp_session.af == AF_INET6:
563         is_ipv6 = 1
564     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
565     if test.vpp_session.af == AF_INET:
566         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
567                           "Local IPv4 address")
568         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
569                           "Peer IPv4 address")
570     else:
571         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
572                           "Local IPv6 address")
573         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
574                           "Peer IPv6 address")
575     test.assert_equal(e.state, expected_state, BFDState)
576
577
578 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
579     """ wait for BFD packet and verify its correctness
580
581     :param timeout: how long to wait
582     :param pcap_time_min: ignore packets with pcap timestamp lower than this
583
584     :returns: tuple (packet, time spent waiting for packet)
585     """
586     test.logger.info("BFD: Waiting for BFD packet")
587     deadline = time.time() + timeout
588     counter = 0
589     while True:
590         counter += 1
591         # sanity check
592         test.assert_in_range(counter, 0, 100, "number of packets ignored")
593         time_left = deadline - time.time()
594         if time_left < 0:
595             raise CaptureTimeoutError("Packet did not arrive within timeout")
596         p = test.pg0.wait_for_packet(timeout=time_left)
597         test.logger.debug(ppp("BFD: Got packet:", p))
598         if pcap_time_min is not None and p.time < pcap_time_min:
599             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
600                                   "pcap time min %s):" %
601                                   (p.time, pcap_time_min), p))
602         else:
603             break
604     bfd = p[BFD]
605     if bfd is None:
606         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
607     if bfd.payload:
608         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
609     verify_ip(test, p)
610     verify_udp(test, p)
611     test.test_session.verify_bfd(p)
612     return p
613
614
615 class BFD4TestCase(VppTestCase):
616     """Bidirectional Forwarding Detection (BFD)"""
617
618     pg0 = None
619     vpp_clock_offset = None
620     vpp_session = None
621     test_session = None
622
623     @classmethod
624     def setUpClass(cls):
625         super(BFD4TestCase, cls).setUpClass()
626         try:
627             cls.create_pg_interfaces([0])
628             cls.create_loopback_interfaces([0])
629             cls.loopback0 = cls.lo_interfaces[0]
630             cls.loopback0.config_ip4()
631             cls.loopback0.admin_up()
632             cls.pg0.config_ip4()
633             cls.pg0.configure_ipv4_neighbors()
634             cls.pg0.admin_up()
635             cls.pg0.resolve_arp()
636
637         except Exception:
638             super(BFD4TestCase, cls).tearDownClass()
639             raise
640
641     def setUp(self):
642         super(BFD4TestCase, self).setUp()
643         self.factory = AuthKeyFactory()
644         self.vapi.want_bfd_events()
645         self.pg0.enable_capture()
646         try:
647             self.vpp_session = VppBFDUDPSession(self, self.pg0,
648                                                 self.pg0.remote_ip4)
649             self.vpp_session.add_vpp_config()
650             self.vpp_session.admin_up()
651             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
652         except:
653             self.vapi.want_bfd_events(enable_disable=0)
654             raise
655
656     def tearDown(self):
657         if not self.vpp_dead:
658             self.vapi.want_bfd_events(enable_disable=0)
659         self.vapi.collect_events()  # clear the event queue
660         super(BFD4TestCase, self).tearDown()
661
662     def test_session_up(self):
663         """ bring BFD session up """
664         bfd_session_up(self)
665
666     def test_session_up_by_ip(self):
667         """ bring BFD session up - first frame looked up by address pair """
668         self.logger.info("BFD: Sending Slow control frame")
669         self.test_session.update(my_discriminator=randint(0, 40000000))
670         self.test_session.send_packet()
671         self.pg0.enable_capture()
672         p = self.pg0.wait_for_packet(1)
673         self.assert_equal(p[BFD].your_discriminator,
674                           self.test_session.my_discriminator,
675                           "BFD - your discriminator")
676         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
677         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
678                                  state=BFDState.up)
679         self.logger.info("BFD: Waiting for event")
680         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
681         verify_event(self, e, expected_state=BFDState.init)
682         self.logger.info("BFD: Sending Up")
683         self.test_session.send_packet()
684         self.logger.info("BFD: Waiting for event")
685         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
686         verify_event(self, e, expected_state=BFDState.up)
687         self.logger.info("BFD: Session is Up")
688         self.test_session.update(state=BFDState.up)
689         self.test_session.send_packet()
690         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
691
692     def test_session_down(self):
693         """ bring BFD session down """
694         bfd_session_up(self)
695         bfd_session_down(self)
696
697     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
698     def test_hold_up(self):
699         """ hold BFD session up """
700         bfd_session_up(self)
701         for dummy in range(self.test_session.detect_mult * 2):
702             wait_for_bfd_packet(self)
703             self.test_session.send_packet()
704         self.assert_equal(len(self.vapi.collect_events()), 0,
705                           "number of bfd events")
706
707     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
708     def test_slow_timer(self):
709         """ verify slow periodic control frames while session down """
710         packet_count = 3
711         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
712         prev_packet = wait_for_bfd_packet(self, 2)
713         for dummy in range(packet_count):
714             next_packet = wait_for_bfd_packet(self, 2)
715             time_diff = next_packet.time - prev_packet.time
716             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
717             # to work around timing issues
718             self.assert_in_range(
719                 time_diff, 0.70, 1.05, "time between slow packets")
720             prev_packet = next_packet
721
722     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
723     def test_zero_remote_min_rx(self):
724         """ no packets when zero remote required min rx interval """
725         bfd_session_up(self)
726         self.test_session.update(required_min_rx=0)
727         self.test_session.send_packet()
728         for dummy in range(self.test_session.detect_mult):
729             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
730                        "sleep before transmitting bfd packet")
731             self.test_session.send_packet()
732             try:
733                 p = wait_for_bfd_packet(self, timeout=0)
734                 self.logger.error(ppp("Received unexpected packet:", p))
735             except CaptureTimeoutError:
736                 pass
737         self.assert_equal(
738             len(self.vapi.collect_events()), 0, "number of bfd events")
739         self.test_session.update(required_min_rx=300000)
740         for dummy in range(3):
741             self.test_session.send_packet()
742             wait_for_bfd_packet(
743                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
744         self.assert_equal(
745             len(self.vapi.collect_events()), 0, "number of bfd events")
746
747     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
748     def test_conn_down(self):
749         """ verify session goes down after inactivity """
750         bfd_session_up(self)
751         detection_time = self.test_session.detect_mult *\
752             self.vpp_session.required_min_rx / USEC_IN_SEC
753         self.sleep(detection_time, "waiting for BFD session time-out")
754         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
755         verify_event(self, e, expected_state=BFDState.down)
756
757     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
758     def test_large_required_min_rx(self):
759         """ large remote required min rx interval """
760         bfd_session_up(self)
761         p = wait_for_bfd_packet(self)
762         interval = 3000000
763         self.test_session.update(required_min_rx=interval)
764         self.test_session.send_packet()
765         time_mark = time.time()
766         count = 0
767         # busy wait here, trying to collect a packet or event, vpp is not
768         # allowed to send packets and the session will timeout first - so the
769         # Up->Down event must arrive before any packets do
770         while time.time() < time_mark + interval / USEC_IN_SEC:
771             try:
772                 p = wait_for_bfd_packet(self, timeout=0)
773                 # if vpp managed to send a packet before we did the session
774                 # session update, then that's fine, ignore it
775                 if p.time < time_mark - self.vpp_clock_offset:
776                     continue
777                 self.logger.error(ppp("Received unexpected packet:", p))
778                 count += 1
779             except CaptureTimeoutError:
780                 pass
781             events = self.vapi.collect_events()
782             if len(events) > 0:
783                 verify_event(self, events[0], BFDState.down)
784                 break
785         self.assert_equal(count, 0, "number of packets received")
786
787     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
788     def test_immediate_remote_min_rx_reduction(self):
789         """ immediately honor remote required min rx reduction """
790         self.vpp_session.remove_vpp_config()
791         self.vpp_session = VppBFDUDPSession(
792             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
793         self.pg0.enable_capture()
794         self.vpp_session.add_vpp_config()
795         self.test_session.update(desired_min_tx=1000000,
796                                  required_min_rx=1000000)
797         bfd_session_up(self)
798         reference_packet = wait_for_bfd_packet(self)
799         time_mark = time.time()
800         interval = 300000
801         self.test_session.update(required_min_rx=interval)
802         self.test_session.send_packet()
803         extra_time = time.time() - time_mark
804         p = wait_for_bfd_packet(self)
805         # first packet is allowed to be late by time we spent doing the update
806         # calculated in extra_time
807         self.assert_in_range(p.time - reference_packet.time,
808                              .95 * 0.75 * interval / USEC_IN_SEC,
809                              1.05 * interval / USEC_IN_SEC + extra_time,
810                              "time between BFD packets")
811         reference_packet = p
812         for dummy in range(3):
813             p = wait_for_bfd_packet(self)
814             diff = p.time - reference_packet.time
815             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
816                                  1.05 * interval / USEC_IN_SEC,
817                                  "time between BFD packets")
818             reference_packet = p
819
820     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
821     def test_modify_req_min_rx_double(self):
822         """ modify session - double required min rx """
823         bfd_session_up(self)
824         p = wait_for_bfd_packet(self)
825         self.test_session.update(desired_min_tx=10000,
826                                  required_min_rx=10000)
827         self.test_session.send_packet()
828         # double required min rx
829         self.vpp_session.modify_parameters(
830             required_min_rx=2 * self.vpp_session.required_min_rx)
831         p = wait_for_bfd_packet(
832             self, pcap_time_min=time.time() - self.vpp_clock_offset)
833         # poll bit needs to be set
834         self.assertIn("P", p.sprintf("%BFD.flags%"),
835                       "Poll bit not set in BFD packet")
836         # finish poll sequence with final packet
837         final = self.test_session.create_packet()
838         final[BFD].flags = "F"
839         timeout = self.test_session.detect_mult * \
840             max(self.test_session.desired_min_tx,
841                 self.vpp_session.required_min_rx) / USEC_IN_SEC
842         self.test_session.send_packet(final)
843         time_mark = time.time()
844         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
845         verify_event(self, e, expected_state=BFDState.down)
846         time_to_event = time.time() - time_mark
847         self.assert_in_range(time_to_event, .9 * timeout,
848                              1.1 * timeout, "session timeout")
849
850     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
851     def test_modify_req_min_rx_halve(self):
852         """ modify session - halve required min rx """
853         self.vpp_session.modify_parameters(
854             required_min_rx=2 * self.vpp_session.required_min_rx)
855         bfd_session_up(self)
856         p = wait_for_bfd_packet(self)
857         self.test_session.update(desired_min_tx=10000,
858                                  required_min_rx=10000)
859         self.test_session.send_packet()
860         p = wait_for_bfd_packet(
861             self, pcap_time_min=time.time() - self.vpp_clock_offset)
862         # halve required min rx
863         old_required_min_rx = self.vpp_session.required_min_rx
864         self.vpp_session.modify_parameters(
865             required_min_rx=0.5 * self.vpp_session.required_min_rx)
866         # now we wait 0.8*3*old-req-min-rx and the session should still be up
867         self.sleep(0.8 * self.vpp_session.detect_mult *
868                    old_required_min_rx / USEC_IN_SEC,
869                    "wait before finishing poll sequence")
870         self.assert_equal(len(self.vapi.collect_events()), 0,
871                           "number of bfd events")
872         p = wait_for_bfd_packet(self)
873         # poll bit needs to be set
874         self.assertIn("P", p.sprintf("%BFD.flags%"),
875                       "Poll bit not set in BFD packet")
876         # finish poll sequence with final packet
877         final = self.test_session.create_packet()
878         final[BFD].flags = "F"
879         self.test_session.send_packet(final)
880         # now the session should time out under new conditions
881         detection_time = self.test_session.detect_mult *\
882             self.vpp_session.required_min_rx / USEC_IN_SEC
883         before = time.time()
884         e = self.vapi.wait_for_event(
885             2 * detection_time, "bfd_udp_session_details")
886         after = time.time()
887         self.assert_in_range(after - before,
888                              0.9 * detection_time,
889                              1.1 * detection_time,
890                              "time before bfd session goes down")
891         verify_event(self, e, expected_state=BFDState.down)
892
893     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
894     def test_modify_detect_mult(self):
895         """ modify detect multiplier """
896         bfd_session_up(self)
897         p = wait_for_bfd_packet(self)
898         self.vpp_session.modify_parameters(detect_mult=1)
899         p = wait_for_bfd_packet(
900             self, pcap_time_min=time.time() - self.vpp_clock_offset)
901         self.assert_equal(self.vpp_session.detect_mult,
902                           p[BFD].detect_mult,
903                           "detect mult")
904         # poll bit must not be set
905         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
906                          "Poll bit not set in BFD packet")
907         self.vpp_session.modify_parameters(detect_mult=10)
908         p = wait_for_bfd_packet(
909             self, pcap_time_min=time.time() - self.vpp_clock_offset)
910         self.assert_equal(self.vpp_session.detect_mult,
911                           p[BFD].detect_mult,
912                           "detect mult")
913         # poll bit must not be set
914         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
915                          "Poll bit not set in BFD packet")
916
917     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
918     def test_queued_poll(self):
919         """ test poll sequence queueing """
920         bfd_session_up(self)
921         p = wait_for_bfd_packet(self)
922         self.vpp_session.modify_parameters(
923             required_min_rx=2 * self.vpp_session.required_min_rx)
924         p = wait_for_bfd_packet(self)
925         poll_sequence_start = time.time()
926         poll_sequence_length_min = 0.5
927         send_final_after = time.time() + poll_sequence_length_min
928         # poll bit needs to be set
929         self.assertIn("P", p.sprintf("%BFD.flags%"),
930                       "Poll bit not set in BFD packet")
931         self.assert_equal(p[BFD].required_min_rx_interval,
932                           self.vpp_session.required_min_rx,
933                           "BFD required min rx interval")
934         self.vpp_session.modify_parameters(
935             required_min_rx=2 * self.vpp_session.required_min_rx)
936         # 2nd poll sequence should be queued now
937         # don't send the reply back yet, wait for some time to emulate
938         # longer round-trip time
939         packet_count = 0
940         while time.time() < send_final_after:
941             self.test_session.send_packet()
942             p = wait_for_bfd_packet(self)
943             self.assert_equal(len(self.vapi.collect_events()), 0,
944                               "number of bfd events")
945             self.assert_equal(p[BFD].required_min_rx_interval,
946                               self.vpp_session.required_min_rx,
947                               "BFD required min rx interval")
948             packet_count += 1
949             # poll bit must be set
950             self.assertIn("P", p.sprintf("%BFD.flags%"),
951                           "Poll bit not set in BFD packet")
952         final = self.test_session.create_packet()
953         final[BFD].flags = "F"
954         self.test_session.send_packet(final)
955         # finish 1st with final
956         poll_sequence_length = time.time() - poll_sequence_start
957         # vpp must wait for some time before starting new poll sequence
958         poll_no_2_started = False
959         for dummy in range(2 * packet_count):
960             p = wait_for_bfd_packet(self)
961             self.assert_equal(len(self.vapi.collect_events()), 0,
962                               "number of bfd events")
963             if "P" in p.sprintf("%BFD.flags%"):
964                 poll_no_2_started = True
965                 if time.time() < poll_sequence_start + poll_sequence_length:
966                     raise Exception("VPP started 2nd poll sequence too soon")
967                 final = self.test_session.create_packet()
968                 final[BFD].flags = "F"
969                 self.test_session.send_packet(final)
970                 break
971             else:
972                 self.test_session.send_packet()
973         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
974         # finish 2nd with final
975         final = self.test_session.create_packet()
976         final[BFD].flags = "F"
977         self.test_session.send_packet(final)
978         p = wait_for_bfd_packet(self)
979         # poll bit must not be set
980         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
981                          "Poll bit set in BFD packet")
982
983     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
984     def test_poll_response(self):
985         """ test correct response to control frame with poll bit set """
986         bfd_session_up(self)
987         poll = self.test_session.create_packet()
988         poll[BFD].flags = "P"
989         self.test_session.send_packet(poll)
990         final = wait_for_bfd_packet(
991             self, pcap_time_min=time.time() - self.vpp_clock_offset)
992         self.assertIn("F", final.sprintf("%BFD.flags%"))
993
994     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
995     def test_no_periodic_if_remote_demand(self):
996         """ no periodic frames outside poll sequence if remote demand set """
997         bfd_session_up(self)
998         demand = self.test_session.create_packet()
999         demand[BFD].flags = "D"
1000         self.test_session.send_packet(demand)
1001         transmit_time = 0.9 \
1002             * max(self.vpp_session.required_min_rx,
1003                   self.test_session.desired_min_tx) \
1004             / USEC_IN_SEC
1005         count = 0
1006         for dummy in range(self.test_session.detect_mult * 2):
1007             time.sleep(transmit_time)
1008             self.test_session.send_packet(demand)
1009             try:
1010                 p = wait_for_bfd_packet(self, timeout=0)
1011                 self.logger.error(ppp("Received unexpected packet:", p))
1012                 count += 1
1013             except CaptureTimeoutError:
1014                 pass
1015         events = self.vapi.collect_events()
1016         for e in events:
1017             self.logger.error("Received unexpected event: %s", e)
1018         self.assert_equal(count, 0, "number of packets received")
1019         self.assert_equal(len(events), 0, "number of events received")
1020
1021     def test_echo_looped_back(self):
1022         """ echo packets looped back """
1023         # don't need a session in this case..
1024         self.vpp_session.remove_vpp_config()
1025         self.pg0.enable_capture()
1026         echo_packet_count = 10
1027         # random source port low enough to increment a few times..
1028         udp_sport_tx = randint(1, 50000)
1029         udp_sport_rx = udp_sport_tx
1030         echo_packet = (Ether(src=self.pg0.remote_mac,
1031                              dst=self.pg0.local_mac) /
1032                        IP(src=self.pg0.remote_ip4,
1033                           dst=self.pg0.remote_ip4) /
1034                        UDP(dport=BFD.udp_dport_echo) /
1035                        Raw("this should be looped back"))
1036         for dummy in range(echo_packet_count):
1037             self.sleep(.01, "delay between echo packets")
1038             echo_packet[UDP].sport = udp_sport_tx
1039             udp_sport_tx += 1
1040             self.logger.debug(ppp("Sending packet:", echo_packet))
1041             self.pg0.add_stream(echo_packet)
1042             self.pg_start()
1043         for dummy in range(echo_packet_count):
1044             p = self.pg0.wait_for_packet(1)
1045             self.logger.debug(ppp("Got packet:", p))
1046             ether = p[Ether]
1047             self.assert_equal(self.pg0.remote_mac,
1048                               ether.dst, "Destination MAC")
1049             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1050             ip = p[IP]
1051             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1052             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1053             udp = p[UDP]
1054             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1055                               "UDP destination port")
1056             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1057             udp_sport_rx += 1
1058             # need to compare the hex payload here, otherwise BFD_vpp_echo
1059             # gets in way
1060             self.assertEqual(str(p[UDP].payload),
1061                              str(echo_packet[UDP].payload),
1062                              "Received packet is not the echo packet sent")
1063         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1064                           "ECHO packet identifier for test purposes)")
1065
1066     def test_echo(self):
1067         """ echo function """
1068         bfd_session_up(self)
1069         self.test_session.update(required_min_echo_rx=150000)
1070         self.test_session.send_packet()
1071         detection_time = self.test_session.detect_mult *\
1072             self.vpp_session.required_min_rx / USEC_IN_SEC
1073         # echo shouldn't work without echo source set
1074         for dummy in range(10):
1075             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1076             self.sleep(sleep, "delay before sending bfd packet")
1077             self.test_session.send_packet()
1078         p = wait_for_bfd_packet(
1079             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1080         self.assert_equal(p[BFD].required_min_rx_interval,
1081                           self.vpp_session.required_min_rx,
1082                           "BFD required min rx interval")
1083         self.test_session.send_packet()
1084         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1085         echo_seen = False
1086         # should be turned on - loopback echo packets
1087         for dummy in range(3):
1088             loop_until = time.time() + 0.75 * detection_time
1089             while time.time() < loop_until:
1090                 p = self.pg0.wait_for_packet(1)
1091                 self.logger.debug(ppp("Got packet:", p))
1092                 if p[UDP].dport == BFD.udp_dport_echo:
1093                     self.assert_equal(
1094                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1095                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1096                                         "BFD ECHO src IP equal to loopback IP")
1097                     self.logger.debug(ppp("Looping back packet:", p))
1098                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1099                                       "ECHO packet destination MAC address")
1100                     p[Ether].dst = self.pg0.local_mac
1101                     self.pg0.add_stream(p)
1102                     self.pg_start()
1103                     echo_seen = True
1104                 elif p.haslayer(BFD):
1105                     if echo_seen:
1106                         self.assertGreaterEqual(
1107                             p[BFD].required_min_rx_interval,
1108                             1000000)
1109                     if "P" in p.sprintf("%BFD.flags%"):
1110                         final = self.test_session.create_packet()
1111                         final[BFD].flags = "F"
1112                         self.test_session.send_packet(final)
1113                 else:
1114                     raise Exception(ppp("Received unknown packet:", p))
1115
1116                 self.assert_equal(len(self.vapi.collect_events()), 0,
1117                                   "number of bfd events")
1118             self.test_session.send_packet()
1119         self.assertTrue(echo_seen, "No echo packets received")
1120
1121     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1122     def test_echo_fail(self):
1123         """ session goes down if echo function fails """
1124         bfd_session_up(self)
1125         self.test_session.update(required_min_echo_rx=150000)
1126         self.test_session.send_packet()
1127         detection_time = self.test_session.detect_mult *\
1128             self.vpp_session.required_min_rx / USEC_IN_SEC
1129         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1130         # echo function should be used now, but we will drop the echo packets
1131         verified_diag = False
1132         for dummy in range(3):
1133             loop_until = time.time() + 0.75 * detection_time
1134             while time.time() < loop_until:
1135                 p = self.pg0.wait_for_packet(1)
1136                 self.logger.debug(ppp("Got packet:", p))
1137                 if p[UDP].dport == BFD.udp_dport_echo:
1138                     # dropped
1139                     pass
1140                 elif p.haslayer(BFD):
1141                     if "P" in p.sprintf("%BFD.flags%"):
1142                         self.assertGreaterEqual(
1143                             p[BFD].required_min_rx_interval,
1144                             1000000)
1145                         final = self.test_session.create_packet()
1146                         final[BFD].flags = "F"
1147                         self.test_session.send_packet(final)
1148                     if p[BFD].state == BFDState.down:
1149                         self.assert_equal(p[BFD].diag,
1150                                           BFDDiagCode.echo_function_failed,
1151                                           BFDDiagCode)
1152                         verified_diag = True
1153                 else:
1154                     raise Exception(ppp("Received unknown packet:", p))
1155             self.test_session.send_packet()
1156         events = self.vapi.collect_events()
1157         self.assert_equal(len(events), 1, "number of bfd events")
1158         self.assert_equal(events[0].state, BFDState.down, BFDState)
1159         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1160
1161     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1162     def test_echo_stop(self):
1163         """ echo function stops if peer sets required min echo rx zero """
1164         bfd_session_up(self)
1165         self.test_session.update(required_min_echo_rx=150000)
1166         self.test_session.send_packet()
1167         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1168         # wait for first echo packet
1169         while True:
1170             p = self.pg0.wait_for_packet(1)
1171             self.logger.debug(ppp("Got packet:", p))
1172             if p[UDP].dport == BFD.udp_dport_echo:
1173                 self.logger.debug(ppp("Looping back packet:", p))
1174                 p[Ether].dst = self.pg0.local_mac
1175                 self.pg0.add_stream(p)
1176                 self.pg_start()
1177                 break
1178             elif p.haslayer(BFD):
1179                 # ignore BFD
1180                 pass
1181             else:
1182                 raise Exception(ppp("Received unknown packet:", p))
1183         self.test_session.update(required_min_echo_rx=0)
1184         self.test_session.send_packet()
1185         # echo packets shouldn't arrive anymore
1186         for dummy in range(5):
1187             wait_for_bfd_packet(
1188                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1189             self.test_session.send_packet()
1190             events = self.vapi.collect_events()
1191             self.assert_equal(len(events), 0, "number of bfd events")
1192
1193     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1194     def test_echo_source_removed(self):
1195         """ echo function stops if echo source is removed """
1196         bfd_session_up(self)
1197         self.test_session.update(required_min_echo_rx=150000)
1198         self.test_session.send_packet()
1199         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1200         # wait for first echo packet
1201         while True:
1202             p = self.pg0.wait_for_packet(1)
1203             self.logger.debug(ppp("Got packet:", p))
1204             if p[UDP].dport == BFD.udp_dport_echo:
1205                 self.logger.debug(ppp("Looping back packet:", p))
1206                 p[Ether].dst = self.pg0.local_mac
1207                 self.pg0.add_stream(p)
1208                 self.pg_start()
1209                 break
1210             elif p.haslayer(BFD):
1211                 # ignore BFD
1212                 pass
1213             else:
1214                 raise Exception(ppp("Received unknown packet:", p))
1215         self.vapi.bfd_udp_del_echo_source()
1216         self.test_session.send_packet()
1217         # echo packets shouldn't arrive anymore
1218         for dummy in range(5):
1219             wait_for_bfd_packet(
1220                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1221             self.test_session.send_packet()
1222             events = self.vapi.collect_events()
1223             self.assert_equal(len(events), 0, "number of bfd events")
1224
1225     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1226     def test_stale_echo(self):
1227         """ stale echo packets don't keep a session up """
1228         bfd_session_up(self)
1229         self.test_session.update(required_min_echo_rx=150000)
1230         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1231         self.test_session.send_packet()
1232         # should be turned on - loopback echo packets
1233         echo_packet = None
1234         timeout_at = None
1235         timeout_ok = False
1236         for dummy in range(10 * self.vpp_session.detect_mult):
1237             p = self.pg0.wait_for_packet(1)
1238             if p[UDP].dport == BFD.udp_dport_echo:
1239                 if echo_packet is None:
1240                     self.logger.debug(ppp("Got first echo packet:", p))
1241                     echo_packet = p
1242                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1243                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1244                 else:
1245                     self.logger.debug(ppp("Got followup echo packet:", p))
1246                 self.logger.debug(ppp("Looping back first echo packet:", p))
1247                 echo_packet[Ether].dst = self.pg0.local_mac
1248                 self.pg0.add_stream(echo_packet)
1249                 self.pg_start()
1250             elif p.haslayer(BFD):
1251                 self.logger.debug(ppp("Got packet:", p))
1252                 if "P" in p.sprintf("%BFD.flags%"):
1253                     final = self.test_session.create_packet()
1254                     final[BFD].flags = "F"
1255                     self.test_session.send_packet(final)
1256                 if p[BFD].state == BFDState.down:
1257                     self.assertIsNotNone(
1258                         timeout_at,
1259                         "Session went down before first echo packet received")
1260                     now = time.time()
1261                     self.assertGreaterEqual(
1262                         now, timeout_at,
1263                         "Session timeout at %s, but is expected at %s" %
1264                         (now, timeout_at))
1265                     self.assert_equal(p[BFD].diag,
1266                                       BFDDiagCode.echo_function_failed,
1267                                       BFDDiagCode)
1268                     events = self.vapi.collect_events()
1269                     self.assert_equal(len(events), 1, "number of bfd events")
1270                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1271                     timeout_ok = True
1272                     break
1273             else:
1274                 raise Exception(ppp("Received unknown packet:", p))
1275             self.test_session.send_packet()
1276         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1277
1278     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1279     def test_invalid_echo_checksum(self):
1280         """ echo packets with invalid checksum don't keep a session up """
1281         bfd_session_up(self)
1282         self.test_session.update(required_min_echo_rx=150000)
1283         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1284         self.test_session.send_packet()
1285         # should be turned on - loopback echo packets
1286         timeout_at = None
1287         timeout_ok = False
1288         for dummy in range(10 * self.vpp_session.detect_mult):
1289             p = self.pg0.wait_for_packet(1)
1290             if p[UDP].dport == BFD.udp_dport_echo:
1291                 self.logger.debug(ppp("Got echo packet:", p))
1292                 if timeout_at is None:
1293                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1294                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1295                 p[BFD_vpp_echo].checksum = getrandbits(64)
1296                 p[Ether].dst = self.pg0.local_mac
1297                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1298                 self.pg0.add_stream(p)
1299                 self.pg_start()
1300             elif p.haslayer(BFD):
1301                 self.logger.debug(ppp("Got packet:", p))
1302                 if "P" in p.sprintf("%BFD.flags%"):
1303                     final = self.test_session.create_packet()
1304                     final[BFD].flags = "F"
1305                     self.test_session.send_packet(final)
1306                 if p[BFD].state == BFDState.down:
1307                     self.assertIsNotNone(
1308                         timeout_at,
1309                         "Session went down before first echo packet received")
1310                     now = time.time()
1311                     self.assertGreaterEqual(
1312                         now, timeout_at,
1313                         "Session timeout at %s, but is expected at %s" %
1314                         (now, timeout_at))
1315                     self.assert_equal(p[BFD].diag,
1316                                       BFDDiagCode.echo_function_failed,
1317                                       BFDDiagCode)
1318                     events = self.vapi.collect_events()
1319                     self.assert_equal(len(events), 1, "number of bfd events")
1320                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1321                     timeout_ok = True
1322                     break
1323             else:
1324                 raise Exception(ppp("Received unknown packet:", p))
1325             self.test_session.send_packet()
1326         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1327
1328     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1329     def test_admin_up_down(self):
1330         """ put session admin-up and admin-down """
1331         bfd_session_up(self)
1332         self.vpp_session.admin_down()
1333         self.pg0.enable_capture()
1334         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1335         verify_event(self, e, expected_state=BFDState.admin_down)
1336         for dummy in range(2):
1337             p = wait_for_bfd_packet(self)
1338             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1339         # try to bring session up - shouldn't be possible
1340         self.test_session.update(state=BFDState.init)
1341         self.test_session.send_packet()
1342         for dummy in range(2):
1343             p = wait_for_bfd_packet(self)
1344             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1345         self.vpp_session.admin_up()
1346         self.test_session.update(state=BFDState.down)
1347         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1348         verify_event(self, e, expected_state=BFDState.down)
1349         p = wait_for_bfd_packet(
1350             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1351         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1352         self.test_session.send_packet()
1353         p = wait_for_bfd_packet(
1354             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1355         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1356         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1357         verify_event(self, e, expected_state=BFDState.init)
1358         self.test_session.update(state=BFDState.up)
1359         self.test_session.send_packet()
1360         p = wait_for_bfd_packet(
1361             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1362         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1363         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1364         verify_event(self, e, expected_state=BFDState.up)
1365
1366     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1367     def test_config_change_remote_demand(self):
1368         """ configuration change while peer in demand mode """
1369         bfd_session_up(self)
1370         demand = self.test_session.create_packet()
1371         demand[BFD].flags = "D"
1372         self.test_session.send_packet(demand)
1373         self.vpp_session.modify_parameters(
1374             required_min_rx=2 * self.vpp_session.required_min_rx)
1375         p = wait_for_bfd_packet(
1376             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1377         # poll bit must be set
1378         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1379         # terminate poll sequence
1380         final = self.test_session.create_packet()
1381         final[BFD].flags = "D+F"
1382         self.test_session.send_packet(final)
1383         # vpp should be quiet now again
1384         transmit_time = 0.9 \
1385             * max(self.vpp_session.required_min_rx,
1386                   self.test_session.desired_min_tx) \
1387             / USEC_IN_SEC
1388         count = 0
1389         for dummy in range(self.test_session.detect_mult * 2):
1390             time.sleep(transmit_time)
1391             self.test_session.send_packet(demand)
1392             try:
1393                 p = wait_for_bfd_packet(self, timeout=0)
1394                 self.logger.error(ppp("Received unexpected packet:", p))
1395                 count += 1
1396             except CaptureTimeoutError:
1397                 pass
1398         events = self.vapi.collect_events()
1399         for e in events:
1400             self.logger.error("Received unexpected event: %s", e)
1401         self.assert_equal(count, 0, "number of packets received")
1402         self.assert_equal(len(events), 0, "number of events received")
1403
1404
1405 class BFD6TestCase(VppTestCase):
1406     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1407
1408     pg0 = None
1409     vpp_clock_offset = None
1410     vpp_session = None
1411     test_session = None
1412
1413     @classmethod
1414     def setUpClass(cls):
1415         super(BFD6TestCase, cls).setUpClass()
1416         try:
1417             cls.create_pg_interfaces([0])
1418             cls.pg0.config_ip6()
1419             cls.pg0.configure_ipv6_neighbors()
1420             cls.pg0.admin_up()
1421             cls.pg0.resolve_ndp()
1422             cls.create_loopback_interfaces([0])
1423             cls.loopback0 = cls.lo_interfaces[0]
1424             cls.loopback0.config_ip6()
1425             cls.loopback0.admin_up()
1426
1427         except Exception:
1428             super(BFD6TestCase, cls).tearDownClass()
1429             raise
1430
1431     def setUp(self):
1432         super(BFD6TestCase, self).setUp()
1433         self.factory = AuthKeyFactory()
1434         self.vapi.want_bfd_events()
1435         self.pg0.enable_capture()
1436         try:
1437             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1438                                                 self.pg0.remote_ip6,
1439                                                 af=AF_INET6)
1440             self.vpp_session.add_vpp_config()
1441             self.vpp_session.admin_up()
1442             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1443             self.logger.debug(self.vapi.cli("show adj nbr"))
1444         except:
1445             self.vapi.want_bfd_events(enable_disable=0)
1446             raise
1447
1448     def tearDown(self):
1449         if not self.vpp_dead:
1450             self.vapi.want_bfd_events(enable_disable=0)
1451         self.vapi.collect_events()  # clear the event queue
1452         super(BFD6TestCase, self).tearDown()
1453
1454     def test_session_up(self):
1455         """ bring BFD session up """
1456         bfd_session_up(self)
1457
1458     def test_session_up_by_ip(self):
1459         """ bring BFD session up - first frame looked up by address pair """
1460         self.logger.info("BFD: Sending Slow control frame")
1461         self.test_session.update(my_discriminator=randint(0, 40000000))
1462         self.test_session.send_packet()
1463         self.pg0.enable_capture()
1464         p = self.pg0.wait_for_packet(1)
1465         self.assert_equal(p[BFD].your_discriminator,
1466                           self.test_session.my_discriminator,
1467                           "BFD - your discriminator")
1468         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1469         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1470                                  state=BFDState.up)
1471         self.logger.info("BFD: Waiting for event")
1472         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1473         verify_event(self, e, expected_state=BFDState.init)
1474         self.logger.info("BFD: Sending Up")
1475         self.test_session.send_packet()
1476         self.logger.info("BFD: Waiting for event")
1477         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1478         verify_event(self, e, expected_state=BFDState.up)
1479         self.logger.info("BFD: Session is Up")
1480         self.test_session.update(state=BFDState.up)
1481         self.test_session.send_packet()
1482         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1483
1484     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1485     def test_hold_up(self):
1486         """ hold BFD session up """
1487         bfd_session_up(self)
1488         for dummy in range(self.test_session.detect_mult * 2):
1489             wait_for_bfd_packet(self)
1490             self.test_session.send_packet()
1491         self.assert_equal(len(self.vapi.collect_events()), 0,
1492                           "number of bfd events")
1493         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1494
1495     def test_echo_looped_back(self):
1496         """ echo packets looped back """
1497         # don't need a session in this case..
1498         self.vpp_session.remove_vpp_config()
1499         self.pg0.enable_capture()
1500         echo_packet_count = 10
1501         # random source port low enough to increment a few times..
1502         udp_sport_tx = randint(1, 50000)
1503         udp_sport_rx = udp_sport_tx
1504         echo_packet = (Ether(src=self.pg0.remote_mac,
1505                              dst=self.pg0.local_mac) /
1506                        IPv6(src=self.pg0.remote_ip6,
1507                             dst=self.pg0.remote_ip6) /
1508                        UDP(dport=BFD.udp_dport_echo) /
1509                        Raw("this should be looped back"))
1510         for dummy in range(echo_packet_count):
1511             self.sleep(.01, "delay between echo packets")
1512             echo_packet[UDP].sport = udp_sport_tx
1513             udp_sport_tx += 1
1514             self.logger.debug(ppp("Sending packet:", echo_packet))
1515             self.pg0.add_stream(echo_packet)
1516             self.pg_start()
1517         for dummy in range(echo_packet_count):
1518             p = self.pg0.wait_for_packet(1)
1519             self.logger.debug(ppp("Got packet:", p))
1520             ether = p[Ether]
1521             self.assert_equal(self.pg0.remote_mac,
1522                               ether.dst, "Destination MAC")
1523             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1524             ip = p[IPv6]
1525             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1526             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1527             udp = p[UDP]
1528             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1529                               "UDP destination port")
1530             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1531             udp_sport_rx += 1
1532             # need to compare the hex payload here, otherwise BFD_vpp_echo
1533             # gets in way
1534             self.assertEqual(str(p[UDP].payload),
1535                              str(echo_packet[UDP].payload),
1536                              "Received packet is not the echo packet sent")
1537         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1538                           "ECHO packet identifier for test purposes)")
1539         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1540                           "ECHO packet identifier for test purposes)")
1541
1542     def test_echo(self):
1543         """ echo function """
1544         bfd_session_up(self)
1545         self.test_session.update(required_min_echo_rx=150000)
1546         self.test_session.send_packet()
1547         detection_time = self.test_session.detect_mult *\
1548             self.vpp_session.required_min_rx / USEC_IN_SEC
1549         # echo shouldn't work without echo source set
1550         for dummy in range(10):
1551             sleep = self.vpp_session.required_min_rx / USEC_IN_SEC
1552             self.sleep(sleep, "delay before sending bfd packet")
1553             self.test_session.send_packet()
1554         p = wait_for_bfd_packet(
1555             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1556         self.assert_equal(p[BFD].required_min_rx_interval,
1557                           self.vpp_session.required_min_rx,
1558                           "BFD required min rx interval")
1559         self.test_session.send_packet()
1560         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1561         echo_seen = False
1562         # should be turned on - loopback echo packets
1563         for dummy in range(3):
1564             loop_until = time.time() + 0.75 * detection_time
1565             while time.time() < loop_until:
1566                 p = self.pg0.wait_for_packet(1)
1567                 self.logger.debug(ppp("Got packet:", p))
1568                 if p[UDP].dport == BFD.udp_dport_echo:
1569                     self.assert_equal(
1570                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1571                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1572                                         "BFD ECHO src IP equal to loopback IP")
1573                     self.logger.debug(ppp("Looping back packet:", p))
1574                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1575                                       "ECHO packet destination MAC address")
1576                     p[Ether].dst = self.pg0.local_mac
1577                     self.pg0.add_stream(p)
1578                     self.pg_start()
1579                     echo_seen = True
1580                 elif p.haslayer(BFD):
1581                     if echo_seen:
1582                         self.assertGreaterEqual(
1583                             p[BFD].required_min_rx_interval,
1584                             1000000)
1585                     if "P" in p.sprintf("%BFD.flags%"):
1586                         final = self.test_session.create_packet()
1587                         final[BFD].flags = "F"
1588                         self.test_session.send_packet(final)
1589                 else:
1590                     raise Exception(ppp("Received unknown packet:", p))
1591
1592                 self.assert_equal(len(self.vapi.collect_events()), 0,
1593                                   "number of bfd events")
1594             self.test_session.send_packet()
1595         self.assertTrue(echo_seen, "No echo packets received")
1596
1597
1598 class BFDFIBTestCase(VppTestCase):
1599     """ BFD-FIB interactions (IPv6) """
1600
1601     vpp_session = None
1602     test_session = None
1603
1604     def setUp(self):
1605         super(BFDFIBTestCase, self).setUp()
1606         self.create_pg_interfaces(range(1))
1607
1608         self.vapi.want_bfd_events()
1609         self.pg0.enable_capture()
1610
1611         for i in self.pg_interfaces:
1612             i.admin_up()
1613             i.config_ip6()
1614             i.configure_ipv6_neighbors()
1615
1616     def tearDown(self):
1617         if not self.vpp_dead:
1618             self.vapi.want_bfd_events(enable_disable=0)
1619
1620         super(BFDFIBTestCase, self).tearDown()
1621
1622     @staticmethod
1623     def pkt_is_not_data_traffic(p):
1624         """ not data traffic implies BFD or the usual IPv6 ND/RA"""
1625         if p.haslayer(BFD) or is_ipv6_misc(p):
1626             return True
1627         return False
1628
1629     def test_session_with_fib(self):
1630         """ BFD-FIB interactions """
1631
1632         # packets to match against both of the routes
1633         p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1634               IPv6(src="3001::1", dst="2001::1") /
1635               UDP(sport=1234, dport=1234) /
1636               Raw('\xa5' * 100)),
1637              (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1638               IPv6(src="3001::1", dst="2002::1") /
1639               UDP(sport=1234, dport=1234) /
1640               Raw('\xa5' * 100))]
1641
1642         # A recursive and a non-recursive route via a next-hop that
1643         # will have a BFD session
1644         ip_2001_s_64 = VppIpRoute(self, "2001::", 64,
1645                                   [VppRoutePath(self.pg0.remote_ip6,
1646                                                 self.pg0.sw_if_index,
1647                                                 is_ip6=1)],
1648                                   is_ip6=1)
1649         ip_2002_s_64 = VppIpRoute(self, "2002::", 64,
1650                                   [VppRoutePath(self.pg0.remote_ip6,
1651                                                 0xffffffff,
1652                                                 is_ip6=1)],
1653                                   is_ip6=1)
1654         ip_2001_s_64.add_vpp_config()
1655         ip_2002_s_64.add_vpp_config()
1656
1657         # bring the session up now the routes are present
1658         self.vpp_session = VppBFDUDPSession(self,
1659                                             self.pg0,
1660                                             self.pg0.remote_ip6,
1661                                             af=AF_INET6)
1662         self.vpp_session.add_vpp_config()
1663         self.vpp_session.admin_up()
1664         self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1665
1666         # session is up - traffic passes
1667         bfd_session_up(self)
1668
1669         self.pg0.add_stream(p)
1670         self.pg_start()
1671         for packet in p:
1672             captured = self.pg0.wait_for_packet(
1673                 1,
1674                 filter_out_fn=self.pkt_is_not_data_traffic)
1675             self.assertEqual(captured[IPv6].dst,
1676                              packet[IPv6].dst)
1677
1678         # session is up - traffic is dropped
1679         bfd_session_down(self)
1680
1681         self.pg0.add_stream(p)
1682         self.pg_start()
1683         with self.assertRaises(CaptureTimeoutError):
1684             self.pg0.wait_for_packet(1, self.pkt_is_not_data_traffic)
1685
1686         # session is up - traffic passes
1687         bfd_session_up(self)
1688
1689         self.pg0.add_stream(p)
1690         self.pg_start()
1691         for packet in p:
1692             captured = self.pg0.wait_for_packet(
1693                 1,
1694                 filter_out_fn=self.pkt_is_not_data_traffic)
1695             self.assertEqual(captured[IPv6].dst,
1696                              packet[IPv6].dst)
1697
1698
1699 class BFDSHA1TestCase(VppTestCase):
1700     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1701
1702     pg0 = None
1703     vpp_clock_offset = None
1704     vpp_session = None
1705     test_session = None
1706
1707     @classmethod
1708     def setUpClass(cls):
1709         super(BFDSHA1TestCase, cls).setUpClass()
1710         try:
1711             cls.create_pg_interfaces([0])
1712             cls.pg0.config_ip4()
1713             cls.pg0.admin_up()
1714             cls.pg0.resolve_arp()
1715
1716         except Exception:
1717             super(BFDSHA1TestCase, cls).tearDownClass()
1718             raise
1719
1720     def setUp(self):
1721         super(BFDSHA1TestCase, self).setUp()
1722         self.factory = AuthKeyFactory()
1723         self.vapi.want_bfd_events()
1724         self.pg0.enable_capture()
1725
1726     def tearDown(self):
1727         if not self.vpp_dead:
1728             self.vapi.want_bfd_events(enable_disable=0)
1729         self.vapi.collect_events()  # clear the event queue
1730         super(BFDSHA1TestCase, self).tearDown()
1731
1732     def test_session_up(self):
1733         """ bring BFD session up """
1734         key = self.factory.create_random_key(self)
1735         key.add_vpp_config()
1736         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1737                                             self.pg0.remote_ip4,
1738                                             sha1_key=key)
1739         self.vpp_session.add_vpp_config()
1740         self.vpp_session.admin_up()
1741         self.test_session = BFDTestSession(
1742             self, self.pg0, AF_INET, sha1_key=key,
1743             bfd_key_id=self.vpp_session.bfd_key_id)
1744         bfd_session_up(self)
1745
1746     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1747     def test_hold_up(self):
1748         """ hold BFD session up """
1749         key = self.factory.create_random_key(self)
1750         key.add_vpp_config()
1751         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1752                                             self.pg0.remote_ip4,
1753                                             sha1_key=key)
1754         self.vpp_session.add_vpp_config()
1755         self.vpp_session.admin_up()
1756         self.test_session = BFDTestSession(
1757             self, self.pg0, AF_INET, sha1_key=key,
1758             bfd_key_id=self.vpp_session.bfd_key_id)
1759         bfd_session_up(self)
1760         for dummy in range(self.test_session.detect_mult * 2):
1761             wait_for_bfd_packet(self)
1762             self.test_session.send_packet()
1763         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1764
1765     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1766     def test_hold_up_meticulous(self):
1767         """ hold BFD session up - meticulous auth """
1768         key = self.factory.create_random_key(
1769             self, BFDAuthType.meticulous_keyed_sha1)
1770         key.add_vpp_config()
1771         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1772                                             self.pg0.remote_ip4, sha1_key=key)
1773         self.vpp_session.add_vpp_config()
1774         self.vpp_session.admin_up()
1775         # specify sequence number so that it wraps
1776         self.test_session = BFDTestSession(
1777             self, self.pg0, AF_INET, sha1_key=key,
1778             bfd_key_id=self.vpp_session.bfd_key_id,
1779             our_seq_number=0xFFFFFFFF - 4)
1780         bfd_session_up(self)
1781         for dummy in range(30):
1782             wait_for_bfd_packet(self)
1783             self.test_session.inc_seq_num()
1784             self.test_session.send_packet()
1785         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1786
1787     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1788     def test_send_bad_seq_number(self):
1789         """ session is not kept alive by msgs with bad sequence numbers"""
1790         key = self.factory.create_random_key(
1791             self, BFDAuthType.meticulous_keyed_sha1)
1792         key.add_vpp_config()
1793         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1794                                             self.pg0.remote_ip4, sha1_key=key)
1795         self.vpp_session.add_vpp_config()
1796         self.test_session = BFDTestSession(
1797             self, self.pg0, AF_INET, sha1_key=key,
1798             bfd_key_id=self.vpp_session.bfd_key_id)
1799         bfd_session_up(self)
1800         detection_time = self.test_session.detect_mult *\
1801             self.vpp_session.required_min_rx / USEC_IN_SEC
1802         send_until = time.time() + 2 * detection_time
1803         while time.time() < send_until:
1804             self.test_session.send_packet()
1805             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1806                        "time between bfd packets")
1807         e = self.vapi.collect_events()
1808         # session should be down now, because the sequence numbers weren't
1809         # updated
1810         self.assert_equal(len(e), 1, "number of bfd events")
1811         verify_event(self, e[0], expected_state=BFDState.down)
1812
1813     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1814                                        legitimate_test_session,
1815                                        rogue_test_session,
1816                                        rogue_bfd_values=None):
1817         """ execute a rogue session interaction scenario
1818
1819         1. create vpp session, add config
1820         2. bring the legitimate session up
1821         3. copy the bfd values from legitimate session to rogue session
1822         4. apply rogue_bfd_values to rogue session
1823         5. set rogue session state to down
1824         6. send message to take the session down from the rogue session
1825         7. assert that the legitimate session is unaffected
1826         """
1827
1828         self.vpp_session = vpp_bfd_udp_session
1829         self.vpp_session.add_vpp_config()
1830         self.test_session = legitimate_test_session
1831         # bring vpp session up
1832         bfd_session_up(self)
1833         # send packet from rogue session
1834         rogue_test_session.update(
1835             my_discriminator=self.test_session.my_discriminator,
1836             your_discriminator=self.test_session.your_discriminator,
1837             desired_min_tx=self.test_session.desired_min_tx,
1838             required_min_rx=self.test_session.required_min_rx,
1839             detect_mult=self.test_session.detect_mult,
1840             diag=self.test_session.diag,
1841             state=self.test_session.state,
1842             auth_type=self.test_session.auth_type)
1843         if rogue_bfd_values:
1844             rogue_test_session.update(**rogue_bfd_values)
1845         rogue_test_session.update(state=BFDState.down)
1846         rogue_test_session.send_packet()
1847         wait_for_bfd_packet(self)
1848         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1849
1850     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1851     def test_mismatch_auth(self):
1852         """ session is not brought down by unauthenticated msg """
1853         key = self.factory.create_random_key(self)
1854         key.add_vpp_config()
1855         vpp_session = VppBFDUDPSession(
1856             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1857         legitimate_test_session = BFDTestSession(
1858             self, self.pg0, AF_INET, sha1_key=key,
1859             bfd_key_id=vpp_session.bfd_key_id)
1860         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1861         self.execute_rogue_session_scenario(vpp_session,
1862                                             legitimate_test_session,
1863                                             rogue_test_session)
1864
1865     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1866     def test_mismatch_bfd_key_id(self):
1867         """ session is not brought down by msg with non-existent key-id """
1868         key = self.factory.create_random_key(self)
1869         key.add_vpp_config()
1870         vpp_session = VppBFDUDPSession(
1871             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1872         # pick a different random bfd key id
1873         x = randint(0, 255)
1874         while x == vpp_session.bfd_key_id:
1875             x = randint(0, 255)
1876         legitimate_test_session = BFDTestSession(
1877             self, self.pg0, AF_INET, sha1_key=key,
1878             bfd_key_id=vpp_session.bfd_key_id)
1879         rogue_test_session = BFDTestSession(
1880             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1881         self.execute_rogue_session_scenario(vpp_session,
1882                                             legitimate_test_session,
1883                                             rogue_test_session)
1884
1885     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1886     def test_mismatched_auth_type(self):
1887         """ session is not brought down by msg with wrong auth type """
1888         key = self.factory.create_random_key(self)
1889         key.add_vpp_config()
1890         vpp_session = VppBFDUDPSession(
1891             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1892         legitimate_test_session = BFDTestSession(
1893             self, self.pg0, AF_INET, sha1_key=key,
1894             bfd_key_id=vpp_session.bfd_key_id)
1895         rogue_test_session = BFDTestSession(
1896             self, self.pg0, AF_INET, sha1_key=key,
1897             bfd_key_id=vpp_session.bfd_key_id)
1898         self.execute_rogue_session_scenario(
1899             vpp_session, legitimate_test_session, rogue_test_session,
1900             {'auth_type': BFDAuthType.keyed_md5})
1901
1902     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1903     def test_restart(self):
1904         """ simulate remote peer restart and resynchronization """
1905         key = self.factory.create_random_key(
1906             self, BFDAuthType.meticulous_keyed_sha1)
1907         key.add_vpp_config()
1908         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1909                                             self.pg0.remote_ip4, sha1_key=key)
1910         self.vpp_session.add_vpp_config()
1911         self.test_session = BFDTestSession(
1912             self, self.pg0, AF_INET, sha1_key=key,
1913             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1914         bfd_session_up(self)
1915         # don't send any packets for 2*detection_time
1916         detection_time = self.test_session.detect_mult *\
1917             self.vpp_session.required_min_rx / USEC_IN_SEC
1918         self.sleep(2 * detection_time, "simulating peer restart")
1919         events = self.vapi.collect_events()
1920         self.assert_equal(len(events), 1, "number of bfd events")
1921         verify_event(self, events[0], expected_state=BFDState.down)
1922         self.test_session.update(state=BFDState.down)
1923         # reset sequence number
1924         self.test_session.our_seq_number = 0
1925         self.test_session.vpp_seq_number = None
1926         # now throw away any pending packets
1927         self.pg0.enable_capture()
1928         bfd_session_up(self)
1929
1930
1931 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1932 class BFDAuthOnOffTestCase(VppTestCase):
1933     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1934
1935     pg0 = None
1936     vpp_session = None
1937     test_session = None
1938
1939     @classmethod
1940     def setUpClass(cls):
1941         super(BFDAuthOnOffTestCase, cls).setUpClass()
1942         try:
1943             cls.create_pg_interfaces([0])
1944             cls.pg0.config_ip4()
1945             cls.pg0.admin_up()
1946             cls.pg0.resolve_arp()
1947
1948         except Exception:
1949             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1950             raise
1951
1952     def setUp(self):
1953         super(BFDAuthOnOffTestCase, self).setUp()
1954         self.factory = AuthKeyFactory()
1955         self.vapi.want_bfd_events()
1956         self.pg0.enable_capture()
1957
1958     def tearDown(self):
1959         if not self.vpp_dead:
1960             self.vapi.want_bfd_events(enable_disable=0)
1961         self.vapi.collect_events()  # clear the event queue
1962         super(BFDAuthOnOffTestCase, self).tearDown()
1963
1964     def test_auth_on_immediate(self):
1965         """ turn auth on without disturbing session state (immediate) """
1966         key = self.factory.create_random_key(self)
1967         key.add_vpp_config()
1968         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1969                                             self.pg0.remote_ip4)
1970         self.vpp_session.add_vpp_config()
1971         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1972         bfd_session_up(self)
1973         for dummy in range(self.test_session.detect_mult * 2):
1974             p = wait_for_bfd_packet(self)
1975             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1976             self.test_session.send_packet()
1977         self.vpp_session.activate_auth(key)
1978         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1979         self.test_session.sha1_key = key
1980         for dummy in range(self.test_session.detect_mult * 2):
1981             p = wait_for_bfd_packet(self)
1982             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1983             self.test_session.send_packet()
1984         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1985         self.assert_equal(len(self.vapi.collect_events()), 0,
1986                           "number of bfd events")
1987
1988     def test_auth_off_immediate(self):
1989         """ turn auth off without disturbing session state (immediate) """
1990         key = self.factory.create_random_key(self)
1991         key.add_vpp_config()
1992         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1993                                             self.pg0.remote_ip4, sha1_key=key)
1994         self.vpp_session.add_vpp_config()
1995         self.test_session = BFDTestSession(
1996             self, self.pg0, AF_INET, sha1_key=key,
1997             bfd_key_id=self.vpp_session.bfd_key_id)
1998         bfd_session_up(self)
1999         # self.vapi.want_bfd_events(enable_disable=0)
2000         for dummy in range(self.test_session.detect_mult * 2):
2001             p = wait_for_bfd_packet(self)
2002             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2003             self.test_session.inc_seq_num()
2004             self.test_session.send_packet()
2005         self.vpp_session.deactivate_auth()
2006         self.test_session.bfd_key_id = None
2007         self.test_session.sha1_key = None
2008         for dummy in range(self.test_session.detect_mult * 2):
2009             p = wait_for_bfd_packet(self)
2010             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2011             self.test_session.inc_seq_num()
2012             self.test_session.send_packet()
2013         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2014         self.assert_equal(len(self.vapi.collect_events()), 0,
2015                           "number of bfd events")
2016
2017     def test_auth_change_key_immediate(self):
2018         """ change auth key without disturbing session state (immediate) """
2019         key1 = self.factory.create_random_key(self)
2020         key1.add_vpp_config()
2021         key2 = self.factory.create_random_key(self)
2022         key2.add_vpp_config()
2023         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2024                                             self.pg0.remote_ip4, sha1_key=key1)
2025         self.vpp_session.add_vpp_config()
2026         self.test_session = BFDTestSession(
2027             self, self.pg0, AF_INET, sha1_key=key1,
2028             bfd_key_id=self.vpp_session.bfd_key_id)
2029         bfd_session_up(self)
2030         for dummy in range(self.test_session.detect_mult * 2):
2031             p = wait_for_bfd_packet(self)
2032             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2033             self.test_session.send_packet()
2034         self.vpp_session.activate_auth(key2)
2035         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2036         self.test_session.sha1_key = key2
2037         for dummy in range(self.test_session.detect_mult * 2):
2038             p = wait_for_bfd_packet(self)
2039             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2040             self.test_session.send_packet()
2041         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2042         self.assert_equal(len(self.vapi.collect_events()), 0,
2043                           "number of bfd events")
2044
2045     def test_auth_on_delayed(self):
2046         """ turn auth on without disturbing session state (delayed) """
2047         key = self.factory.create_random_key(self)
2048         key.add_vpp_config()
2049         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2050                                             self.pg0.remote_ip4)
2051         self.vpp_session.add_vpp_config()
2052         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
2053         bfd_session_up(self)
2054         for dummy in range(self.test_session.detect_mult * 2):
2055             wait_for_bfd_packet(self)
2056             self.test_session.send_packet()
2057         self.vpp_session.activate_auth(key, delayed=True)
2058         for dummy in range(self.test_session.detect_mult * 2):
2059             p = wait_for_bfd_packet(self)
2060             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2061             self.test_session.send_packet()
2062         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2063         self.test_session.sha1_key = key
2064         self.test_session.send_packet()
2065         for dummy in range(self.test_session.detect_mult * 2):
2066             p = wait_for_bfd_packet(self)
2067             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2068             self.test_session.send_packet()
2069         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2070         self.assert_equal(len(self.vapi.collect_events()), 0,
2071                           "number of bfd events")
2072
2073     def test_auth_off_delayed(self):
2074         """ turn auth off without disturbing session state (delayed) """
2075         key = self.factory.create_random_key(self)
2076         key.add_vpp_config()
2077         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2078                                             self.pg0.remote_ip4, sha1_key=key)
2079         self.vpp_session.add_vpp_config()
2080         self.test_session = BFDTestSession(
2081             self, self.pg0, AF_INET, sha1_key=key,
2082             bfd_key_id=self.vpp_session.bfd_key_id)
2083         bfd_session_up(self)
2084         for dummy in range(self.test_session.detect_mult * 2):
2085             p = wait_for_bfd_packet(self)
2086             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2087             self.test_session.send_packet()
2088         self.vpp_session.deactivate_auth(delayed=True)
2089         for dummy in range(self.test_session.detect_mult * 2):
2090             p = wait_for_bfd_packet(self)
2091             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2092             self.test_session.send_packet()
2093         self.test_session.bfd_key_id = None
2094         self.test_session.sha1_key = None
2095         self.test_session.send_packet()
2096         for dummy in range(self.test_session.detect_mult * 2):
2097             p = wait_for_bfd_packet(self)
2098             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2099             self.test_session.send_packet()
2100         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2101         self.assert_equal(len(self.vapi.collect_events()), 0,
2102                           "number of bfd events")
2103
2104     def test_auth_change_key_delayed(self):
2105         """ change auth key without disturbing session state (delayed) """
2106         key1 = self.factory.create_random_key(self)
2107         key1.add_vpp_config()
2108         key2 = self.factory.create_random_key(self)
2109         key2.add_vpp_config()
2110         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2111                                             self.pg0.remote_ip4, sha1_key=key1)
2112         self.vpp_session.add_vpp_config()
2113         self.vpp_session.admin_up()
2114         self.test_session = BFDTestSession(
2115             self, self.pg0, AF_INET, sha1_key=key1,
2116             bfd_key_id=self.vpp_session.bfd_key_id)
2117         bfd_session_up(self)
2118         for dummy in range(self.test_session.detect_mult * 2):
2119             p = wait_for_bfd_packet(self)
2120             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2121             self.test_session.send_packet()
2122         self.vpp_session.activate_auth(key2, delayed=True)
2123         for dummy in range(self.test_session.detect_mult * 2):
2124             p = wait_for_bfd_packet(self)
2125             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2126             self.test_session.send_packet()
2127         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2128         self.test_session.sha1_key = key2
2129         self.test_session.send_packet()
2130         for dummy in range(self.test_session.detect_mult * 2):
2131             p = wait_for_bfd_packet(self)
2132             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2133             self.test_session.send_packet()
2134         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2135         self.assert_equal(len(self.vapi.collect_events()), 0,
2136                           "number of bfd events")
2137
2138
2139 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2140 class BFDCLITestCase(VppTestCase):
2141     """Bidirectional Forwarding Detection (BFD) (CLI) """
2142     pg0 = None
2143
2144     @classmethod
2145     def setUpClass(cls):
2146         super(BFDCLITestCase, cls).setUpClass()
2147
2148         try:
2149             cls.create_pg_interfaces((0,))
2150             cls.pg0.config_ip4()
2151             cls.pg0.config_ip6()
2152             cls.pg0.resolve_arp()
2153             cls.pg0.resolve_ndp()
2154
2155         except Exception:
2156             super(BFDCLITestCase, cls).tearDownClass()
2157             raise
2158
2159     def setUp(self):
2160         super(BFDCLITestCase, self).setUp()
2161         self.factory = AuthKeyFactory()
2162         self.pg0.enable_capture()
2163
2164     def tearDown(self):
2165         try:
2166             self.vapi.want_bfd_events(enable_disable=0)
2167         except UnexpectedApiReturnValueError:
2168             # some tests aren't subscribed, so this is not an issue
2169             pass
2170         self.vapi.collect_events()  # clear the event queue
2171         super(BFDCLITestCase, self).tearDown()
2172
2173     def cli_verify_no_response(self, cli):
2174         """ execute a CLI, asserting that the response is empty """
2175         self.assert_equal(self.vapi.cli(cli),
2176                           "",
2177                           "CLI command response")
2178
2179     def cli_verify_response(self, cli, expected):
2180         """ execute a CLI, asserting that the response matches expectation """
2181         self.assert_equal(self.vapi.cli(cli).strip(),
2182                           expected,
2183                           "CLI command response")
2184
2185     def test_show(self):
2186         """ show commands """
2187         k1 = self.factory.create_random_key(self)
2188         k1.add_vpp_config()
2189         k2 = self.factory.create_random_key(
2190             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2191         k2.add_vpp_config()
2192         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2193         s1.add_vpp_config()
2194         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2195                               sha1_key=k2)
2196         s2.add_vpp_config()
2197         self.logger.info(self.vapi.ppcli("show bfd keys"))
2198         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2199         self.logger.info(self.vapi.ppcli("show bfd"))
2200
2201     def test_set_del_sha1_key(self):
2202         """ set/delete SHA1 auth key """
2203         k = self.factory.create_random_key(self)
2204         self.registry.register(k, self.logger)
2205         self.cli_verify_no_response(
2206             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2207             (k.conf_key_id,
2208                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2209         self.assertTrue(k.query_vpp_config())
2210         self.vpp_session = VppBFDUDPSession(
2211             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2212         self.vpp_session.add_vpp_config()
2213         self.test_session = \
2214             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2215                            bfd_key_id=self.vpp_session.bfd_key_id)
2216         self.vapi.want_bfd_events()
2217         bfd_session_up(self)
2218         bfd_session_down(self)
2219         # try to replace the secret for the key - should fail because the key
2220         # is in-use
2221         k2 = self.factory.create_random_key(self)
2222         self.cli_verify_response(
2223             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2224             (k.conf_key_id,
2225                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2226             "bfd key set: `bfd_auth_set_key' API call failed, "
2227             "rv=-103:BFD object in use")
2228         # manipulating the session using old secret should still work
2229         bfd_session_up(self)
2230         bfd_session_down(self)
2231         self.vpp_session.remove_vpp_config()
2232         self.cli_verify_no_response(
2233             "bfd key del conf-key-id %s" % k.conf_key_id)
2234         self.assertFalse(k.query_vpp_config())
2235
2236     def test_set_del_meticulous_sha1_key(self):
2237         """ set/delete meticulous SHA1 auth key """
2238         k = self.factory.create_random_key(
2239             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2240         self.registry.register(k, self.logger)
2241         self.cli_verify_no_response(
2242             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2243             (k.conf_key_id,
2244                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2245         self.assertTrue(k.query_vpp_config())
2246         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2247                                             self.pg0.remote_ip6, af=AF_INET6,
2248                                             sha1_key=k)
2249         self.vpp_session.add_vpp_config()
2250         self.vpp_session.admin_up()
2251         self.test_session = \
2252             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2253                            bfd_key_id=self.vpp_session.bfd_key_id)
2254         self.vapi.want_bfd_events()
2255         bfd_session_up(self)
2256         bfd_session_down(self)
2257         # try to replace the secret for the key - should fail because the key
2258         # is in-use
2259         k2 = self.factory.create_random_key(self)
2260         self.cli_verify_response(
2261             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2262             (k.conf_key_id,
2263                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2264             "bfd key set: `bfd_auth_set_key' API call failed, "
2265             "rv=-103:BFD object in use")
2266         # manipulating the session using old secret should still work
2267         bfd_session_up(self)
2268         bfd_session_down(self)
2269         self.vpp_session.remove_vpp_config()
2270         self.cli_verify_no_response(
2271             "bfd key del conf-key-id %s" % k.conf_key_id)
2272         self.assertFalse(k.query_vpp_config())
2273
2274     def test_add_mod_del_bfd_udp(self):
2275         """ create/modify/delete IPv4 BFD UDP session """
2276         vpp_session = VppBFDUDPSession(
2277             self, self.pg0, self.pg0.remote_ip4)
2278         self.registry.register(vpp_session, self.logger)
2279         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2280             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2281             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2282                                 self.pg0.remote_ip4,
2283                                 vpp_session.desired_min_tx,
2284                                 vpp_session.required_min_rx,
2285                                 vpp_session.detect_mult)
2286         self.cli_verify_no_response(cli_add_cmd)
2287         # 2nd add should fail
2288         self.cli_verify_response(
2289             cli_add_cmd,
2290             "bfd udp session add: `bfd_add_add_session' API call"
2291             " failed, rv=-101:Duplicate BFD object")
2292         verify_bfd_session_config(self, vpp_session)
2293         mod_session = VppBFDUDPSession(
2294             self, self.pg0, self.pg0.remote_ip4,
2295             required_min_rx=2 * vpp_session.required_min_rx,
2296             desired_min_tx=3 * vpp_session.desired_min_tx,
2297             detect_mult=4 * vpp_session.detect_mult)
2298         self.cli_verify_no_response(
2299             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2300             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2301             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2302              mod_session.desired_min_tx, mod_session.required_min_rx,
2303              mod_session.detect_mult))
2304         verify_bfd_session_config(self, mod_session)
2305         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2306             "peer-addr %s" % (self.pg0.name,
2307                               self.pg0.local_ip4, self.pg0.remote_ip4)
2308         self.cli_verify_no_response(cli_del_cmd)
2309         # 2nd del is expected to fail
2310         self.cli_verify_response(
2311             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2312             " failed, rv=-102:No such BFD object")
2313         self.assertFalse(vpp_session.query_vpp_config())
2314
2315     def test_add_mod_del_bfd_udp6(self):
2316         """ create/modify/delete IPv6 BFD UDP session """
2317         vpp_session = VppBFDUDPSession(
2318             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2319         self.registry.register(vpp_session, self.logger)
2320         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2321             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2322             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2323                                 self.pg0.remote_ip6,
2324                                 vpp_session.desired_min_tx,
2325                                 vpp_session.required_min_rx,
2326                                 vpp_session.detect_mult)
2327         self.cli_verify_no_response(cli_add_cmd)
2328         # 2nd add should fail
2329         self.cli_verify_response(
2330             cli_add_cmd,
2331             "bfd udp session add: `bfd_add_add_session' API call"
2332             " failed, rv=-101:Duplicate BFD object")
2333         verify_bfd_session_config(self, vpp_session)
2334         mod_session = VppBFDUDPSession(
2335             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2336             required_min_rx=2 * vpp_session.required_min_rx,
2337             desired_min_tx=3 * vpp_session.desired_min_tx,
2338             detect_mult=4 * vpp_session.detect_mult)
2339         self.cli_verify_no_response(
2340             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2341             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2342             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2343              mod_session.desired_min_tx,
2344              mod_session.required_min_rx, mod_session.detect_mult))
2345         verify_bfd_session_config(self, mod_session)
2346         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2347             "peer-addr %s" % (self.pg0.name,
2348                               self.pg0.local_ip6, self.pg0.remote_ip6)
2349         self.cli_verify_no_response(cli_del_cmd)
2350         # 2nd del is expected to fail
2351         self.cli_verify_response(
2352             cli_del_cmd,
2353             "bfd udp session del: `bfd_udp_del_session' API call"
2354             " failed, rv=-102:No such BFD object")
2355         self.assertFalse(vpp_session.query_vpp_config())
2356
2357     def test_add_mod_del_bfd_udp_auth(self):
2358         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2359         key = self.factory.create_random_key(self)
2360         key.add_vpp_config()
2361         vpp_session = VppBFDUDPSession(
2362             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2363         self.registry.register(vpp_session, self.logger)
2364         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2365             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2366             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2367             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2368                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2369                vpp_session.detect_mult, key.conf_key_id,
2370                vpp_session.bfd_key_id)
2371         self.cli_verify_no_response(cli_add_cmd)
2372         # 2nd add should fail
2373         self.cli_verify_response(
2374             cli_add_cmd,
2375             "bfd udp session add: `bfd_add_add_session' API call"
2376             " failed, rv=-101:Duplicate BFD object")
2377         verify_bfd_session_config(self, vpp_session)
2378         mod_session = VppBFDUDPSession(
2379             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2380             bfd_key_id=vpp_session.bfd_key_id,
2381             required_min_rx=2 * vpp_session.required_min_rx,
2382             desired_min_tx=3 * vpp_session.desired_min_tx,
2383             detect_mult=4 * vpp_session.detect_mult)
2384         self.cli_verify_no_response(
2385             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2386             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2387             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2388              mod_session.desired_min_tx,
2389              mod_session.required_min_rx, mod_session.detect_mult))
2390         verify_bfd_session_config(self, mod_session)
2391         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2392             "peer-addr %s" % (self.pg0.name,
2393                               self.pg0.local_ip4, self.pg0.remote_ip4)
2394         self.cli_verify_no_response(cli_del_cmd)
2395         # 2nd del is expected to fail
2396         self.cli_verify_response(
2397             cli_del_cmd,
2398             "bfd udp session del: `bfd_udp_del_session' API call"
2399             " failed, rv=-102:No such BFD object")
2400         self.assertFalse(vpp_session.query_vpp_config())
2401
2402     def test_add_mod_del_bfd_udp6_auth(self):
2403         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2404         key = self.factory.create_random_key(
2405             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2406         key.add_vpp_config()
2407         vpp_session = VppBFDUDPSession(
2408             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2409         self.registry.register(vpp_session, self.logger)
2410         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2411             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2412             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2413             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2414                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2415                vpp_session.detect_mult, key.conf_key_id,
2416                vpp_session.bfd_key_id)
2417         self.cli_verify_no_response(cli_add_cmd)
2418         # 2nd add should fail
2419         self.cli_verify_response(
2420             cli_add_cmd,
2421             "bfd udp session add: `bfd_add_add_session' API call"
2422             " failed, rv=-101:Duplicate BFD object")
2423         verify_bfd_session_config(self, vpp_session)
2424         mod_session = VppBFDUDPSession(
2425             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2426             bfd_key_id=vpp_session.bfd_key_id,
2427             required_min_rx=2 * vpp_session.required_min_rx,
2428             desired_min_tx=3 * vpp_session.desired_min_tx,
2429             detect_mult=4 * vpp_session.detect_mult)
2430         self.cli_verify_no_response(
2431             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2432             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2433             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2434              mod_session.desired_min_tx,
2435              mod_session.required_min_rx, mod_session.detect_mult))
2436         verify_bfd_session_config(self, mod_session)
2437         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2438             "peer-addr %s" % (self.pg0.name,
2439                               self.pg0.local_ip6, self.pg0.remote_ip6)
2440         self.cli_verify_no_response(cli_del_cmd)
2441         # 2nd del is expected to fail
2442         self.cli_verify_response(
2443             cli_del_cmd,
2444             "bfd udp session del: `bfd_udp_del_session' API call"
2445             " failed, rv=-102:No such BFD object")
2446         self.assertFalse(vpp_session.query_vpp_config())
2447
2448     def test_auth_on_off(self):
2449         """ turn authentication on and off """
2450         key = self.factory.create_random_key(
2451             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2452         key.add_vpp_config()
2453         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2454         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2455                                         sha1_key=key)
2456         session.add_vpp_config()
2457         cli_activate = \
2458             "bfd udp session auth activate interface %s local-addr %s "\
2459             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2460             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2461                key.conf_key_id, auth_session.bfd_key_id)
2462         self.cli_verify_no_response(cli_activate)
2463         verify_bfd_session_config(self, auth_session)
2464         self.cli_verify_no_response(cli_activate)
2465         verify_bfd_session_config(self, auth_session)
2466         cli_deactivate = \
2467             "bfd udp session auth deactivate interface %s local-addr %s "\
2468             "peer-addr %s "\
2469             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2470         self.cli_verify_no_response(cli_deactivate)
2471         verify_bfd_session_config(self, session)
2472         self.cli_verify_no_response(cli_deactivate)
2473         verify_bfd_session_config(self, session)
2474
2475     def test_auth_on_off_delayed(self):
2476         """ turn authentication on and off (delayed) """
2477         key = self.factory.create_random_key(
2478             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2479         key.add_vpp_config()
2480         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2481         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2482                                         sha1_key=key)
2483         session.add_vpp_config()
2484         cli_activate = \
2485             "bfd udp session auth activate interface %s local-addr %s "\
2486             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2487             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2488                key.conf_key_id, auth_session.bfd_key_id)
2489         self.cli_verify_no_response(cli_activate)
2490         verify_bfd_session_config(self, auth_session)
2491         self.cli_verify_no_response(cli_activate)
2492         verify_bfd_session_config(self, auth_session)
2493         cli_deactivate = \
2494             "bfd udp session auth deactivate interface %s local-addr %s "\
2495             "peer-addr %s delayed yes"\
2496             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2497         self.cli_verify_no_response(cli_deactivate)
2498         verify_bfd_session_config(self, session)
2499         self.cli_verify_no_response(cli_deactivate)
2500         verify_bfd_session_config(self, session)
2501
2502     def test_admin_up_down(self):
2503         """ put session admin-up and admin-down """
2504         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2505         session.add_vpp_config()
2506         cli_down = \
2507             "bfd udp session set-flags admin down interface %s local-addr %s "\
2508             "peer-addr %s "\
2509             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2510         cli_up = \
2511             "bfd udp session set-flags admin up interface %s local-addr %s "\
2512             "peer-addr %s "\
2513             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2514         self.cli_verify_no_response(cli_down)
2515         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2516         self.cli_verify_no_response(cli_up)
2517         verify_bfd_session_config(self, session, state=BFDState.down)
2518
2519     def test_set_del_udp_echo_source(self):
2520         """ set/del udp echo source """
2521         self.create_loopback_interfaces([0])
2522         self.loopback0 = self.lo_interfaces[0]
2523         self.loopback0.admin_up()
2524         self.cli_verify_response("show bfd echo-source",
2525                                  "UDP echo source is not set.")
2526         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2527         self.cli_verify_no_response(cli_set)
2528         self.cli_verify_response("show bfd echo-source",
2529                                  "UDP echo source is: %s\n"
2530                                  "IPv4 address usable as echo source: none\n"
2531                                  "IPv6 address usable as echo source: none" %
2532                                  self.loopback0.name)
2533         self.loopback0.config_ip4()
2534         unpacked = unpack("!L", self.loopback0.local_ip4n)
2535         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2536         self.cli_verify_response("show bfd echo-source",
2537                                  "UDP echo source is: %s\n"
2538                                  "IPv4 address usable as echo source: %s\n"
2539                                  "IPv6 address usable as echo source: none" %
2540                                  (self.loopback0.name, echo_ip4))
2541         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2542         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2543                                             unpacked[2], unpacked[3] ^ 1))
2544         self.loopback0.config_ip6()
2545         self.cli_verify_response("show bfd echo-source",
2546                                  "UDP echo source is: %s\n"
2547                                  "IPv4 address usable as echo source: %s\n"
2548                                  "IPv6 address usable as echo source: %s" %
2549                                  (self.loopback0.name, echo_ip4, echo_ip6))
2550         cli_del = "bfd udp echo-source del"
2551         self.cli_verify_no_response(cli_del)
2552         self.cli_verify_response("show bfd echo-source",
2553                                  "UDP echo source is not set.")
2554
2555 if __name__ == '__main__':
2556     unittest.main(testRunner=VppTestRunner)