Add MAC address check in ethernet-input node if interface in L3 mode
[vpp.git] / test / test_bfd.py
1 #!/usr/bin/env python
2 """ BFD tests """
3
4 from __future__ import division
5 import unittest
6 import hashlib
7 import binascii
8 import time
9 from struct import pack, unpack
10 from random import randint, shuffle, getrandbits
11 from socket import AF_INET, AF_INET6, inet_ntop
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import UDP, IP
15 from scapy.layers.inet6 import IPv6
16 from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
17     BFDDiagCode, BFDState, BFD_vpp_echo
18 from framework import VppTestCase, VppTestRunner, running_extended_tests
19 from vpp_pg_interface import CaptureTimeoutError
20 from util import ppp
21 from vpp_papi_provider import UnexpectedApiReturnValueError
22
23 USEC_IN_SEC = 1000000
24
25
26 class AuthKeyFactory(object):
27     """Factory class for creating auth keys with unique conf key ID"""
28
29     def __init__(self):
30         self._conf_key_ids = {}
31
32     def create_random_key(self, test, auth_type=BFDAuthType.keyed_sha1):
33         """ create a random key with unique conf key id """
34         conf_key_id = randint(0, 0xFFFFFFFF)
35         while conf_key_id in self._conf_key_ids:
36             conf_key_id = randint(0, 0xFFFFFFFF)
37         self._conf_key_ids[conf_key_id] = 1
38         key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
39         return VppBFDAuthKey(test=test, auth_type=auth_type,
40                              conf_key_id=conf_key_id, key=key)
41
42
43 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
44 class BFDAPITestCase(VppTestCase):
45     """Bidirectional Forwarding Detection (BFD) - API"""
46
47     pg0 = None
48     pg1 = None
49
50     @classmethod
51     def setUpClass(cls):
52         super(BFDAPITestCase, cls).setUpClass()
53
54         try:
55             cls.create_pg_interfaces(range(2))
56             for i in cls.pg_interfaces:
57                 i.config_ip4()
58                 i.config_ip6()
59                 i.resolve_arp()
60
61         except Exception:
62             super(BFDAPITestCase, cls).tearDownClass()
63             raise
64
65     def setUp(self):
66         super(BFDAPITestCase, self).setUp()
67         self.factory = AuthKeyFactory()
68
69     def test_add_bfd(self):
70         """ create a BFD session """
71         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
72         session.add_vpp_config()
73         self.logger.debug("Session state is %s", session.state)
74         session.remove_vpp_config()
75         session.add_vpp_config()
76         self.logger.debug("Session state is %s", session.state)
77         session.remove_vpp_config()
78
79     def test_double_add(self):
80         """ create the same BFD session twice (negative case) """
81         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
82         session.add_vpp_config()
83
84         with self.vapi.expect_negative_api_retval():
85             session.add_vpp_config()
86
87         session.remove_vpp_config()
88
89     def test_add_bfd6(self):
90         """ create IPv6 BFD session """
91         session = VppBFDUDPSession(
92             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
93         session.add_vpp_config()
94         self.logger.debug("Session state is %s", session.state)
95         session.remove_vpp_config()
96         session.add_vpp_config()
97         self.logger.debug("Session state is %s", session.state)
98         session.remove_vpp_config()
99
100     def test_mod_bfd(self):
101         """ modify BFD session parameters """
102         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
103                                    desired_min_tx=50000,
104                                    required_min_rx=10000,
105                                    detect_mult=1)
106         session.add_vpp_config()
107         s = session.get_bfd_udp_session_dump_entry()
108         self.assert_equal(session.desired_min_tx,
109                           s.desired_min_tx,
110                           "desired min transmit interval")
111         self.assert_equal(session.required_min_rx,
112                           s.required_min_rx,
113                           "required min receive interval")
114         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
115         session.modify_parameters(desired_min_tx=session.desired_min_tx * 2,
116                                   required_min_rx=session.required_min_rx * 2,
117                                   detect_mult=session.detect_mult * 2)
118         s = session.get_bfd_udp_session_dump_entry()
119         self.assert_equal(session.desired_min_tx,
120                           s.desired_min_tx,
121                           "desired min transmit interval")
122         self.assert_equal(session.required_min_rx,
123                           s.required_min_rx,
124                           "required min receive interval")
125         self.assert_equal(session.detect_mult, s.detect_mult, "detect mult")
126
127     def test_add_sha1_keys(self):
128         """ add SHA1 keys """
129         key_count = 10
130         keys = [self.factory.create_random_key(
131             self) for i in range(0, key_count)]
132         for key in keys:
133             self.assertFalse(key.query_vpp_config())
134         for key in keys:
135             key.add_vpp_config()
136         for key in keys:
137             self.assertTrue(key.query_vpp_config())
138         # remove randomly
139         indexes = range(key_count)
140         shuffle(indexes)
141         removed = []
142         for i in indexes:
143             key = keys[i]
144             key.remove_vpp_config()
145             removed.append(i)
146             for j in range(key_count):
147                 key = keys[j]
148                 if j in removed:
149                     self.assertFalse(key.query_vpp_config())
150                 else:
151                     self.assertTrue(key.query_vpp_config())
152         # should be removed now
153         for key in keys:
154             self.assertFalse(key.query_vpp_config())
155         # add back and remove again
156         for key in keys:
157             key.add_vpp_config()
158         for key in keys:
159             self.assertTrue(key.query_vpp_config())
160         for key in keys:
161             key.remove_vpp_config()
162         for key in keys:
163             self.assertFalse(key.query_vpp_config())
164
165     def test_add_bfd_sha1(self):
166         """ create a BFD session (SHA1) """
167         key = self.factory.create_random_key(self)
168         key.add_vpp_config()
169         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
170                                    sha1_key=key)
171         session.add_vpp_config()
172         self.logger.debug("Session state is %s", session.state)
173         session.remove_vpp_config()
174         session.add_vpp_config()
175         self.logger.debug("Session state is %s", session.state)
176         session.remove_vpp_config()
177
178     def test_double_add_sha1(self):
179         """ create the same BFD session twice (negative case) (SHA1) """
180         key = self.factory.create_random_key(self)
181         key.add_vpp_config()
182         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
183                                    sha1_key=key)
184         session.add_vpp_config()
185         with self.assertRaises(Exception):
186             session.add_vpp_config()
187
188     def test_add_auth_nonexistent_key(self):
189         """ create BFD session using non-existent SHA1 (negative case) """
190         session = VppBFDUDPSession(
191             self, self.pg0, self.pg0.remote_ip4,
192             sha1_key=self.factory.create_random_key(self))
193         with self.assertRaises(Exception):
194             session.add_vpp_config()
195
196     def test_shared_sha1_key(self):
197         """ share single SHA1 key between multiple BFD sessions """
198         key = self.factory.create_random_key(self)
199         key.add_vpp_config()
200         sessions = [
201             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
202                              sha1_key=key),
203             VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6,
204                              sha1_key=key, af=AF_INET6),
205             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip4,
206                              sha1_key=key),
207             VppBFDUDPSession(self, self.pg1, self.pg1.remote_ip6,
208                              sha1_key=key, af=AF_INET6)]
209         for s in sessions:
210             s.add_vpp_config()
211         removed = 0
212         for s in sessions:
213             e = key.get_bfd_auth_keys_dump_entry()
214             self.assert_equal(e.use_count, len(sessions) - removed,
215                               "Use count for shared key")
216             s.remove_vpp_config()
217             removed += 1
218         e = key.get_bfd_auth_keys_dump_entry()
219         self.assert_equal(e.use_count, len(sessions) - removed,
220                           "Use count for shared key")
221
222     def test_activate_auth(self):
223         """ activate SHA1 authentication """
224         key = self.factory.create_random_key(self)
225         key.add_vpp_config()
226         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
227         session.add_vpp_config()
228         session.activate_auth(key)
229
230     def test_deactivate_auth(self):
231         """ deactivate SHA1 authentication """
232         key = self.factory.create_random_key(self)
233         key.add_vpp_config()
234         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
235         session.add_vpp_config()
236         session.activate_auth(key)
237         session.deactivate_auth()
238
239     def test_change_key(self):
240         """ change SHA1 key """
241         key1 = self.factory.create_random_key(self)
242         key2 = self.factory.create_random_key(self)
243         while key2.conf_key_id == key1.conf_key_id:
244             key2 = self.factory.create_random_key(self)
245         key1.add_vpp_config()
246         key2.add_vpp_config()
247         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
248                                    sha1_key=key1)
249         session.add_vpp_config()
250         session.activate_auth(key2)
251
252
253 class BFDTestSession(object):
254     """ BFD session as seen from test framework side """
255
256     def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
257                  bfd_key_id=None, our_seq_number=None):
258         self.test = test
259         self.af = af
260         self.sha1_key = sha1_key
261         self.bfd_key_id = bfd_key_id
262         self.interface = interface
263         self.udp_sport = randint(49152, 65535)
264         if our_seq_number is None:
265             self.our_seq_number = randint(0, 40000000)
266         else:
267             self.our_seq_number = our_seq_number
268         self.vpp_seq_number = None
269         self.my_discriminator = 0
270         self.desired_min_tx = 100000
271         self.required_min_rx = 100000
272         self.required_min_echo_rx = None
273         self.detect_mult = detect_mult
274         self.diag = BFDDiagCode.no_diagnostic
275         self.your_discriminator = None
276         self.state = BFDState.down
277         self.auth_type = BFDAuthType.no_auth
278
279     def inc_seq_num(self):
280         """ increment sequence number, wrapping if needed """
281         if self.our_seq_number == 0xFFFFFFFF:
282             self.our_seq_number = 0
283         else:
284             self.our_seq_number += 1
285
286     def update(self, my_discriminator=None, your_discriminator=None,
287                desired_min_tx=None, required_min_rx=None,
288                required_min_echo_rx=None, detect_mult=None,
289                diag=None, state=None, auth_type=None):
290         """ update BFD parameters associated with session """
291         if my_discriminator is not None:
292             self.my_discriminator = my_discriminator
293         if your_discriminator is not None:
294             self.your_discriminator = your_discriminator
295         if required_min_rx is not None:
296             self.required_min_rx = required_min_rx
297         if required_min_echo_rx is not None:
298             self.required_min_echo_rx = required_min_echo_rx
299         if desired_min_tx is not None:
300             self.desired_min_tx = desired_min_tx
301         if detect_mult is not None:
302             self.detect_mult = detect_mult
303         if diag is not None:
304             self.diag = diag
305         if state is not None:
306             self.state = state
307         if auth_type is not None:
308             self.auth_type = auth_type
309
310     def fill_packet_fields(self, packet):
311         """ set packet fields with known values in packet """
312         bfd = packet[BFD]
313         if self.my_discriminator:
314             self.test.logger.debug("BFD: setting packet.my_discriminator=%s",
315                                    self.my_discriminator)
316             bfd.my_discriminator = self.my_discriminator
317         if self.your_discriminator:
318             self.test.logger.debug("BFD: setting packet.your_discriminator=%s",
319                                    self.your_discriminator)
320             bfd.your_discriminator = self.your_discriminator
321         if self.required_min_rx:
322             self.test.logger.debug(
323                 "BFD: setting packet.required_min_rx_interval=%s",
324                 self.required_min_rx)
325             bfd.required_min_rx_interval = self.required_min_rx
326         if self.required_min_echo_rx:
327             self.test.logger.debug(
328                 "BFD: setting packet.required_min_echo_rx=%s",
329                 self.required_min_echo_rx)
330             bfd.required_min_echo_rx_interval = self.required_min_echo_rx
331         if self.desired_min_tx:
332             self.test.logger.debug(
333                 "BFD: setting packet.desired_min_tx_interval=%s",
334                 self.desired_min_tx)
335             bfd.desired_min_tx_interval = self.desired_min_tx
336         if self.detect_mult:
337             self.test.logger.debug(
338                 "BFD: setting packet.detect_mult=%s", self.detect_mult)
339             bfd.detect_mult = self.detect_mult
340         if self.diag:
341             self.test.logger.debug("BFD: setting packet.diag=%s", self.diag)
342             bfd.diag = self.diag
343         if self.state:
344             self.test.logger.debug("BFD: setting packet.state=%s", self.state)
345             bfd.state = self.state
346         if self.auth_type:
347             # this is used by a negative test-case
348             self.test.logger.debug("BFD: setting packet.auth_type=%s",
349                                    self.auth_type)
350             bfd.auth_type = self.auth_type
351
352     def create_packet(self):
353         """ create a BFD packet, reflecting the current state of session """
354         if self.sha1_key:
355             bfd = BFD(flags="A")
356             bfd.auth_type = self.sha1_key.auth_type
357             bfd.auth_len = BFD.sha1_auth_len
358             bfd.auth_key_id = self.bfd_key_id
359             bfd.auth_seq_num = self.our_seq_number
360             bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
361         else:
362             bfd = BFD()
363         if self.af == AF_INET6:
364             packet = (Ether(src=self.interface.remote_mac,
365                             dst=self.interface.local_mac) /
366                       IPv6(src=self.interface.remote_ip6,
367                            dst=self.interface.local_ip6,
368                            hlim=255) /
369                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
370                       bfd)
371         else:
372             packet = (Ether(src=self.interface.remote_mac,
373                             dst=self.interface.local_mac) /
374                       IP(src=self.interface.remote_ip4,
375                          dst=self.interface.local_ip4,
376                          ttl=255) /
377                       UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
378                       bfd)
379         self.test.logger.debug("BFD: Creating packet")
380         self.fill_packet_fields(packet)
381         if self.sha1_key:
382             hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
383                 "\0" * (20 - len(self.sha1_key.key))
384             self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
385                                    hashlib.sha1(hash_material).hexdigest())
386             packet[BFD].auth_key_hash = hashlib.sha1(hash_material).digest()
387         return packet
388
389     def send_packet(self, packet=None, interface=None):
390         """ send packet on interface, creating the packet if needed """
391         if packet is None:
392             packet = self.create_packet()
393         if interface is None:
394             interface = self.test.pg0
395         self.test.logger.debug(ppp("Sending packet:", packet))
396         interface.add_stream(packet)
397         self.test.pg_start()
398
399     def verify_sha1_auth(self, packet):
400         """ Verify correctness of authentication in BFD layer. """
401         bfd = packet[BFD]
402         self.test.assert_equal(bfd.auth_len, 28, "Auth section length")
403         self.test.assert_equal(bfd.auth_type, self.sha1_key.auth_type,
404                                BFDAuthType)
405         self.test.assert_equal(bfd.auth_key_id, self.bfd_key_id, "Key ID")
406         self.test.assert_equal(bfd.auth_reserved, 0, "Reserved")
407         if self.vpp_seq_number is None:
408             self.vpp_seq_number = bfd.auth_seq_num
409             self.test.logger.debug("Received initial sequence number: %s" %
410                                    self.vpp_seq_number)
411         else:
412             recvd_seq_num = bfd.auth_seq_num
413             self.test.logger.debug("Received followup sequence number: %s" %
414                                    recvd_seq_num)
415             if self.vpp_seq_number < 0xffffffff:
416                 if self.sha1_key.auth_type == \
417                         BFDAuthType.meticulous_keyed_sha1:
418                     self.test.assert_equal(recvd_seq_num,
419                                            self.vpp_seq_number + 1,
420                                            "BFD sequence number")
421                 else:
422                     self.test.assert_in_range(recvd_seq_num,
423                                               self.vpp_seq_number,
424                                               self.vpp_seq_number + 1,
425                                               "BFD sequence number")
426             else:
427                 if self.sha1_key.auth_type == \
428                         BFDAuthType.meticulous_keyed_sha1:
429                     self.test.assert_equal(recvd_seq_num, 0,
430                                            "BFD sequence number")
431                 else:
432                     self.test.assertIn(recvd_seq_num, (self.vpp_seq_number, 0),
433                                        "BFD sequence number not one of "
434                                        "(%s, 0)" % self.vpp_seq_number)
435             self.vpp_seq_number = recvd_seq_num
436         # last 20 bytes represent the hash - so replace them with the key,
437         # pad the result with zeros and hash the result
438         hash_material = bfd.original[:-20] + self.sha1_key.key + \
439             "\0" * (20 - len(self.sha1_key.key))
440         expected_hash = hashlib.sha1(hash_material).hexdigest()
441         self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
442                                expected_hash, "Auth key hash")
443
444     def verify_bfd(self, packet):
445         """ Verify correctness of BFD layer. """
446         bfd = packet[BFD]
447         self.test.assert_equal(bfd.version, 1, "BFD version")
448         self.test.assert_equal(bfd.your_discriminator,
449                                self.my_discriminator,
450                                "BFD - your discriminator")
451         if self.sha1_key:
452             self.verify_sha1_auth(packet)
453
454
455 def bfd_session_up(test):
456     """ Bring BFD session up """
457     test.logger.info("BFD: Waiting for slow hello")
458     p = wait_for_bfd_packet(test, 2)
459     old_offset = None
460     if hasattr(test, 'vpp_clock_offset'):
461         old_offset = test.vpp_clock_offset
462     test.vpp_clock_offset = time.time() - p.time
463     test.logger.debug("BFD: Calculated vpp clock offset: %s",
464                       test.vpp_clock_offset)
465     if old_offset:
466         test.assertAlmostEqual(
467             old_offset, test.vpp_clock_offset, delta=0.5,
468             msg="vpp clock offset not stable (new: %s, old: %s)" %
469             (test.vpp_clock_offset, old_offset))
470     test.logger.info("BFD: Sending Init")
471     test.test_session.update(my_discriminator=randint(0, 40000000),
472                              your_discriminator=p[BFD].my_discriminator,
473                              state=BFDState.init)
474     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
475             BFDAuthType.meticulous_keyed_sha1:
476         test.test_session.inc_seq_num()
477     test.test_session.send_packet()
478     test.logger.info("BFD: Waiting for event")
479     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
480     verify_event(test, e, expected_state=BFDState.up)
481     test.logger.info("BFD: Session is Up")
482     test.test_session.update(state=BFDState.up)
483     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
484             BFDAuthType.meticulous_keyed_sha1:
485         test.test_session.inc_seq_num()
486     test.test_session.send_packet()
487     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
488
489
490 def bfd_session_down(test):
491     """ Bring BFD session down """
492     test.assert_equal(test.vpp_session.state, BFDState.up, BFDState)
493     test.test_session.update(state=BFDState.down)
494     if test.test_session.sha1_key and test.test_session.sha1_key.auth_type == \
495             BFDAuthType.meticulous_keyed_sha1:
496         test.test_session.inc_seq_num()
497     test.test_session.send_packet()
498     test.logger.info("BFD: Waiting for event")
499     e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
500     verify_event(test, e, expected_state=BFDState.down)
501     test.logger.info("BFD: Session is Down")
502     test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
503
504
505 def verify_bfd_session_config(test, session, state=None):
506     dump = session.get_bfd_udp_session_dump_entry()
507     test.assertIsNotNone(dump)
508     # since dump is not none, we have verified that sw_if_index and addresses
509     # are valid (in get_bfd_udp_session_dump_entry)
510     if state:
511         test.assert_equal(dump.state, state, "session state")
512     test.assert_equal(dump.required_min_rx, session.required_min_rx,
513                       "required min rx interval")
514     test.assert_equal(dump.desired_min_tx, session.desired_min_tx,
515                       "desired min tx interval")
516     test.assert_equal(dump.detect_mult, session.detect_mult,
517                       "detect multiplier")
518     if session.sha1_key is None:
519         test.assert_equal(dump.is_authenticated, 0, "is_authenticated flag")
520     else:
521         test.assert_equal(dump.is_authenticated, 1, "is_authenticated flag")
522         test.assert_equal(dump.bfd_key_id, session.bfd_key_id,
523                           "bfd key id")
524         test.assert_equal(dump.conf_key_id,
525                           session.sha1_key.conf_key_id,
526                           "config key id")
527
528
529 def verify_ip(test, packet):
530     """ Verify correctness of IP layer. """
531     if test.vpp_session.af == AF_INET6:
532         ip = packet[IPv6]
533         local_ip = test.pg0.local_ip6
534         remote_ip = test.pg0.remote_ip6
535         test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
536     else:
537         ip = packet[IP]
538         local_ip = test.pg0.local_ip4
539         remote_ip = test.pg0.remote_ip4
540         test.assert_equal(ip.ttl, 255, "IPv4 TTL")
541     test.assert_equal(ip.src, local_ip, "IP source address")
542     test.assert_equal(ip.dst, remote_ip, "IP destination address")
543
544
545 def verify_udp(test, packet):
546     """ Verify correctness of UDP layer. """
547     udp = packet[UDP]
548     test.assert_equal(udp.dport, BFD.udp_dport, "UDP destination port")
549     test.assert_in_range(udp.sport, BFD.udp_sport_min, BFD.udp_sport_max,
550                          "UDP source port")
551
552
553 def verify_event(test, event, expected_state):
554     """ Verify correctness of event values. """
555     e = event
556     test.logger.debug("BFD: Event: %s" % repr(e))
557     test.assert_equal(e.sw_if_index,
558                       test.vpp_session.interface.sw_if_index,
559                       "BFD interface index")
560     is_ipv6 = 0
561     if test.vpp_session.af == AF_INET6:
562         is_ipv6 = 1
563     test.assert_equal(e.is_ipv6, is_ipv6, "is_ipv6")
564     if test.vpp_session.af == AF_INET:
565         test.assert_equal(e.local_addr[:4], test.vpp_session.local_addr_n,
566                           "Local IPv4 address")
567         test.assert_equal(e.peer_addr[:4], test.vpp_session.peer_addr_n,
568                           "Peer IPv4 address")
569     else:
570         test.assert_equal(e.local_addr, test.vpp_session.local_addr_n,
571                           "Local IPv6 address")
572         test.assert_equal(e.peer_addr, test.vpp_session.peer_addr_n,
573                           "Peer IPv6 address")
574     test.assert_equal(e.state, expected_state, BFDState)
575
576
577 def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
578     """ wait for BFD packet and verify its correctness
579
580     :param timeout: how long to wait
581     :param pcap_time_min: ignore packets with pcap timestamp lower than this
582
583     :returns: tuple (packet, time spent waiting for packet)
584     """
585     test.logger.info("BFD: Waiting for BFD packet")
586     deadline = time.time() + timeout
587     counter = 0
588     while True:
589         counter += 1
590         # sanity check
591         test.assert_in_range(counter, 0, 100, "number of packets ignored")
592         time_left = deadline - time.time()
593         if time_left < 0:
594             raise CaptureTimeoutError("Packet did not arrive within timeout")
595         p = test.pg0.wait_for_packet(timeout=time_left)
596         test.logger.debug(ppp("BFD: Got packet:", p))
597         if pcap_time_min is not None and p.time < pcap_time_min:
598             test.logger.debug(ppp("BFD: ignoring packet (pcap time %s < "
599                                   "pcap time min %s):" %
600                                   (p.time, pcap_time_min), p))
601         else:
602             break
603     bfd = p[BFD]
604     if bfd is None:
605         raise Exception(ppp("Unexpected or invalid BFD packet:", p))
606     if bfd.payload:
607         raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
608     verify_ip(test, p)
609     verify_udp(test, p)
610     test.test_session.verify_bfd(p)
611     return p
612
613
614 class BFD4TestCase(VppTestCase):
615     """Bidirectional Forwarding Detection (BFD)"""
616
617     pg0 = None
618     vpp_clock_offset = None
619     vpp_session = None
620     test_session = None
621
622     @classmethod
623     def setUpClass(cls):
624         super(BFD4TestCase, cls).setUpClass()
625         try:
626             cls.create_pg_interfaces([0])
627             cls.create_loopback_interfaces([0])
628             cls.loopback0 = cls.lo_interfaces[0]
629             cls.loopback0.config_ip4()
630             cls.loopback0.admin_up()
631             cls.pg0.config_ip4()
632             cls.pg0.configure_ipv4_neighbors()
633             cls.pg0.admin_up()
634             cls.pg0.resolve_arp()
635
636         except Exception:
637             super(BFD4TestCase, cls).tearDownClass()
638             raise
639
640     def setUp(self):
641         super(BFD4TestCase, self).setUp()
642         self.factory = AuthKeyFactory()
643         self.vapi.want_bfd_events()
644         self.pg0.enable_capture()
645         try:
646             self.vpp_session = VppBFDUDPSession(self, self.pg0,
647                                                 self.pg0.remote_ip4)
648             self.vpp_session.add_vpp_config()
649             self.vpp_session.admin_up()
650             self.test_session = BFDTestSession(self, self.pg0, AF_INET)
651         except:
652             self.vapi.want_bfd_events(enable_disable=0)
653             raise
654
655     def tearDown(self):
656         if not self.vpp_dead:
657             self.vapi.want_bfd_events(enable_disable=0)
658         self.vapi.collect_events()  # clear the event queue
659         super(BFD4TestCase, self).tearDown()
660
661     def test_session_up(self):
662         """ bring BFD session up """
663         bfd_session_up(self)
664
665     def test_session_up_by_ip(self):
666         """ bring BFD session up - first frame looked up by address pair """
667         self.logger.info("BFD: Sending Slow control frame")
668         self.test_session.update(my_discriminator=randint(0, 40000000))
669         self.test_session.send_packet()
670         self.pg0.enable_capture()
671         p = self.pg0.wait_for_packet(1)
672         self.assert_equal(p[BFD].your_discriminator,
673                           self.test_session.my_discriminator,
674                           "BFD - your discriminator")
675         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
676         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
677                                  state=BFDState.up)
678         self.logger.info("BFD: Waiting for event")
679         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
680         verify_event(self, e, expected_state=BFDState.init)
681         self.logger.info("BFD: Sending Up")
682         self.test_session.send_packet()
683         self.logger.info("BFD: Waiting for event")
684         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
685         verify_event(self, e, expected_state=BFDState.up)
686         self.logger.info("BFD: Session is Up")
687         self.test_session.update(state=BFDState.up)
688         self.test_session.send_packet()
689         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
690
691     def test_session_down(self):
692         """ bring BFD session down """
693         bfd_session_up(self)
694         bfd_session_down(self)
695
696     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
697     def test_hold_up(self):
698         """ hold BFD session up """
699         bfd_session_up(self)
700         for dummy in range(self.test_session.detect_mult * 2):
701             wait_for_bfd_packet(self)
702             self.test_session.send_packet()
703         self.assert_equal(len(self.vapi.collect_events()), 0,
704                           "number of bfd events")
705
706     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
707     def test_slow_timer(self):
708         """ verify slow periodic control frames while session down """
709         packet_count = 3
710         self.logger.info("BFD: Waiting for %d BFD packets", packet_count)
711         prev_packet = wait_for_bfd_packet(self, 2)
712         for dummy in range(packet_count):
713             next_packet = wait_for_bfd_packet(self, 2)
714             time_diff = next_packet.time - prev_packet.time
715             # spec says the range should be <0.75, 1>, allow extra 0.05 margin
716             # to work around timing issues
717             self.assert_in_range(
718                 time_diff, 0.70, 1.05, "time between slow packets")
719             prev_packet = next_packet
720
721     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
722     def test_zero_remote_min_rx(self):
723         """ no packets when zero remote required min rx interval """
724         bfd_session_up(self)
725         self.test_session.update(required_min_rx=0)
726         self.test_session.send_packet()
727         for dummy in range(self.test_session.detect_mult):
728             self.sleep(self.vpp_session.required_min_rx / USEC_IN_SEC,
729                        "sleep before transmitting bfd packet")
730             self.test_session.send_packet()
731             try:
732                 p = wait_for_bfd_packet(self, timeout=0)
733                 self.logger.error(ppp("Received unexpected packet:", p))
734             except CaptureTimeoutError:
735                 pass
736         self.assert_equal(
737             len(self.vapi.collect_events()), 0, "number of bfd events")
738         self.test_session.update(required_min_rx=100000)
739         for dummy in range(3):
740             self.test_session.send_packet()
741             wait_for_bfd_packet(
742                 self, timeout=self.test_session.required_min_rx / USEC_IN_SEC)
743         self.assert_equal(
744             len(self.vapi.collect_events()), 0, "number of bfd events")
745
746     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
747     def test_conn_down(self):
748         """ verify session goes down after inactivity """
749         bfd_session_up(self)
750         detection_time = self.test_session.detect_mult *\
751             self.vpp_session.required_min_rx / USEC_IN_SEC
752         self.sleep(detection_time, "waiting for BFD session time-out")
753         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
754         verify_event(self, e, expected_state=BFDState.down)
755
756     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
757     def test_large_required_min_rx(self):
758         """ large remote required min rx interval """
759         bfd_session_up(self)
760         p = wait_for_bfd_packet(self)
761         interval = 3000000
762         self.test_session.update(required_min_rx=interval)
763         self.test_session.send_packet()
764         time_mark = time.time()
765         count = 0
766         # busy wait here, trying to collect a packet or event, vpp is not
767         # allowed to send packets and the session will timeout first - so the
768         # Up->Down event must arrive before any packets do
769         while time.time() < time_mark + interval / USEC_IN_SEC:
770             try:
771                 p = wait_for_bfd_packet(self, timeout=0)
772                 # if vpp managed to send a packet before we did the session
773                 # session update, then that's fine, ignore it
774                 if p.time < time_mark - self.vpp_clock_offset:
775                     continue
776                 self.logger.error(ppp("Received unexpected packet:", p))
777                 count += 1
778             except CaptureTimeoutError:
779                 pass
780             events = self.vapi.collect_events()
781             if len(events) > 0:
782                 verify_event(self, events[0], BFDState.down)
783                 break
784         self.assert_equal(count, 0, "number of packets received")
785
786     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
787     def test_immediate_remote_min_rx_reduction(self):
788         """ immediately honor remote required min rx reduction """
789         self.vpp_session.remove_vpp_config()
790         self.vpp_session = VppBFDUDPSession(
791             self, self.pg0, self.pg0.remote_ip4, desired_min_tx=10000)
792         self.pg0.enable_capture()
793         self.vpp_session.add_vpp_config()
794         self.test_session.update(desired_min_tx=1000000,
795                                  required_min_rx=1000000)
796         bfd_session_up(self)
797         reference_packet = wait_for_bfd_packet(self)
798         time_mark = time.time()
799         interval = 300000
800         self.test_session.update(required_min_rx=interval)
801         self.test_session.send_packet()
802         extra_time = time.time() - time_mark
803         p = wait_for_bfd_packet(self)
804         # first packet is allowed to be late by time we spent doing the update
805         # calculated in extra_time
806         self.assert_in_range(p.time - reference_packet.time,
807                              .95 * 0.75 * interval / USEC_IN_SEC,
808                              1.05 * interval / USEC_IN_SEC + extra_time,
809                              "time between BFD packets")
810         reference_packet = p
811         for dummy in range(3):
812             p = wait_for_bfd_packet(self)
813             diff = p.time - reference_packet.time
814             self.assert_in_range(diff, .95 * .75 * interval / USEC_IN_SEC,
815                                  1.05 * interval / USEC_IN_SEC,
816                                  "time between BFD packets")
817             reference_packet = p
818
819     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
820     def test_modify_req_min_rx_double(self):
821         """ modify session - double required min rx """
822         bfd_session_up(self)
823         p = wait_for_bfd_packet(self)
824         self.test_session.update(desired_min_tx=10000,
825                                  required_min_rx=10000)
826         self.test_session.send_packet()
827         # double required min rx
828         self.vpp_session.modify_parameters(
829             required_min_rx=2 * self.vpp_session.required_min_rx)
830         p = wait_for_bfd_packet(
831             self, pcap_time_min=time.time() - self.vpp_clock_offset)
832         # poll bit needs to be set
833         self.assertIn("P", p.sprintf("%BFD.flags%"),
834                       "Poll bit not set in BFD packet")
835         # finish poll sequence with final packet
836         final = self.test_session.create_packet()
837         final[BFD].flags = "F"
838         timeout = self.test_session.detect_mult * \
839             max(self.test_session.desired_min_tx,
840                 self.vpp_session.required_min_rx) / USEC_IN_SEC
841         self.test_session.send_packet(final)
842         time_mark = time.time()
843         e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
844         verify_event(self, e, expected_state=BFDState.down)
845         time_to_event = time.time() - time_mark
846         self.assert_in_range(time_to_event, .9 * timeout,
847                              1.1 * timeout, "session timeout")
848
849     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
850     def test_modify_req_min_rx_halve(self):
851         """ modify session - halve required min rx """
852         self.vpp_session.modify_parameters(
853             required_min_rx=2 * self.vpp_session.required_min_rx)
854         bfd_session_up(self)
855         p = wait_for_bfd_packet(self)
856         self.test_session.update(desired_min_tx=10000,
857                                  required_min_rx=10000)
858         self.test_session.send_packet()
859         p = wait_for_bfd_packet(
860             self, pcap_time_min=time.time() - self.vpp_clock_offset)
861         # halve required min rx
862         old_required_min_rx = self.vpp_session.required_min_rx
863         self.vpp_session.modify_parameters(
864             required_min_rx=0.5 * self.vpp_session.required_min_rx)
865         # now we wait 0.8*3*old-req-min-rx and the session should still be up
866         self.sleep(0.8 * self.vpp_session.detect_mult *
867                    old_required_min_rx / USEC_IN_SEC)
868         self.assert_equal(len(self.vapi.collect_events()), 0,
869                           "number of bfd events")
870         p = wait_for_bfd_packet(self)
871         # poll bit needs to be set
872         self.assertIn("P", p.sprintf("%BFD.flags%"),
873                       "Poll bit not set in BFD packet")
874         # finish poll sequence with final packet
875         final = self.test_session.create_packet()
876         final[BFD].flags = "F"
877         self.test_session.send_packet(final)
878         # now the session should time out under new conditions
879         before = time.time()
880         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
881         after = time.time()
882         detection_time = self.test_session.detect_mult *\
883             self.vpp_session.required_min_rx / USEC_IN_SEC
884         self.assert_in_range(after - before,
885                              0.9 * detection_time,
886                              1.1 * detection_time,
887                              "time before bfd session goes down")
888         verify_event(self, e, expected_state=BFDState.down)
889
890     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
891     def test_modify_detect_mult(self):
892         """ modify detect multiplier """
893         bfd_session_up(self)
894         p = wait_for_bfd_packet(self)
895         self.vpp_session.modify_parameters(detect_mult=1)
896         p = wait_for_bfd_packet(
897             self, pcap_time_min=time.time() - self.vpp_clock_offset)
898         self.assert_equal(self.vpp_session.detect_mult,
899                           p[BFD].detect_mult,
900                           "detect mult")
901         # poll bit must not be set
902         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
903                          "Poll bit not set in BFD packet")
904         self.vpp_session.modify_parameters(detect_mult=10)
905         p = wait_for_bfd_packet(
906             self, pcap_time_min=time.time() - self.vpp_clock_offset)
907         self.assert_equal(self.vpp_session.detect_mult,
908                           p[BFD].detect_mult,
909                           "detect mult")
910         # poll bit must not be set
911         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
912                          "Poll bit not set in BFD packet")
913
914     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
915     def test_queued_poll(self):
916         """ test poll sequence queueing """
917         bfd_session_up(self)
918         p = wait_for_bfd_packet(self)
919         self.vpp_session.modify_parameters(
920             required_min_rx=2 * self.vpp_session.required_min_rx)
921         p = wait_for_bfd_packet(self)
922         poll_sequence_start = time.time()
923         poll_sequence_length_min = 0.5
924         send_final_after = time.time() + poll_sequence_length_min
925         # poll bit needs to be set
926         self.assertIn("P", p.sprintf("%BFD.flags%"),
927                       "Poll bit not set in BFD packet")
928         self.assert_equal(p[BFD].required_min_rx_interval,
929                           self.vpp_session.required_min_rx,
930                           "BFD required min rx interval")
931         self.vpp_session.modify_parameters(
932             required_min_rx=2 * self.vpp_session.required_min_rx)
933         # 2nd poll sequence should be queued now
934         # don't send the reply back yet, wait for some time to emulate
935         # longer round-trip time
936         packet_count = 0
937         while time.time() < send_final_after:
938             self.test_session.send_packet()
939             p = wait_for_bfd_packet(self)
940             self.assert_equal(len(self.vapi.collect_events()), 0,
941                               "number of bfd events")
942             self.assert_equal(p[BFD].required_min_rx_interval,
943                               self.vpp_session.required_min_rx,
944                               "BFD required min rx interval")
945             packet_count += 1
946             # poll bit must be set
947             self.assertIn("P", p.sprintf("%BFD.flags%"),
948                           "Poll bit not set in BFD packet")
949         final = self.test_session.create_packet()
950         final[BFD].flags = "F"
951         self.test_session.send_packet(final)
952         # finish 1st with final
953         poll_sequence_length = time.time() - poll_sequence_start
954         # vpp must wait for some time before starting new poll sequence
955         poll_no_2_started = False
956         for dummy in range(2 * packet_count):
957             p = wait_for_bfd_packet(self)
958             self.assert_equal(len(self.vapi.collect_events()), 0,
959                               "number of bfd events")
960             if "P" in p.sprintf("%BFD.flags%"):
961                 poll_no_2_started = True
962                 if time.time() < poll_sequence_start + poll_sequence_length:
963                     raise Exception("VPP started 2nd poll sequence too soon")
964                 final = self.test_session.create_packet()
965                 final[BFD].flags = "F"
966                 self.test_session.send_packet(final)
967                 break
968             else:
969                 self.test_session.send_packet()
970         self.assertTrue(poll_no_2_started, "2nd poll sequence not performed")
971         # finish 2nd with final
972         final = self.test_session.create_packet()
973         final[BFD].flags = "F"
974         self.test_session.send_packet(final)
975         p = wait_for_bfd_packet(self)
976         # poll bit must not be set
977         self.assertNotIn("P", p.sprintf("%BFD.flags%"),
978                          "Poll bit set in BFD packet")
979
980     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
981     def test_poll_response(self):
982         """ test correct response to control frame with poll bit set """
983         bfd_session_up(self)
984         poll = self.test_session.create_packet()
985         poll[BFD].flags = "P"
986         self.test_session.send_packet(poll)
987         final = wait_for_bfd_packet(
988             self, pcap_time_min=time.time() - self.vpp_clock_offset)
989         self.assertIn("F", final.sprintf("%BFD.flags%"))
990
991     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
992     def test_no_periodic_if_remote_demand(self):
993         """ no periodic frames outside poll sequence if remote demand set """
994         bfd_session_up(self)
995         demand = self.test_session.create_packet()
996         demand[BFD].flags = "D"
997         self.test_session.send_packet(demand)
998         transmit_time = 0.9 \
999             * max(self.vpp_session.required_min_rx,
1000                   self.test_session.desired_min_tx) \
1001             / USEC_IN_SEC
1002         count = 0
1003         for dummy in range(self.test_session.detect_mult * 2):
1004             time.sleep(transmit_time)
1005             self.test_session.send_packet(demand)
1006             try:
1007                 p = wait_for_bfd_packet(self, timeout=0)
1008                 self.logger.error(ppp("Received unexpected packet:", p))
1009                 count += 1
1010             except CaptureTimeoutError:
1011                 pass
1012         events = self.vapi.collect_events()
1013         for e in events:
1014             self.logger.error("Received unexpected event: %s", e)
1015         self.assert_equal(count, 0, "number of packets received")
1016         self.assert_equal(len(events), 0, "number of events received")
1017
1018     def test_echo_looped_back(self):
1019         """ echo packets looped back """
1020         # don't need a session in this case..
1021         self.vpp_session.remove_vpp_config()
1022         self.pg0.enable_capture()
1023         echo_packet_count = 10
1024         # random source port low enough to increment a few times..
1025         udp_sport_tx = randint(1, 50000)
1026         udp_sport_rx = udp_sport_tx
1027         echo_packet = (Ether(src=self.pg0.remote_mac,
1028                              dst=self.pg0.local_mac) /
1029                        IP(src=self.pg0.remote_ip4,
1030                           dst=self.pg0.remote_ip4) /
1031                        UDP(dport=BFD.udp_dport_echo) /
1032                        Raw("this should be looped back"))
1033         for dummy in range(echo_packet_count):
1034             self.sleep(.01, "delay between echo packets")
1035             echo_packet[UDP].sport = udp_sport_tx
1036             udp_sport_tx += 1
1037             self.logger.debug(ppp("Sending packet:", echo_packet))
1038             self.pg0.add_stream(echo_packet)
1039             self.pg_start()
1040         for dummy in range(echo_packet_count):
1041             p = self.pg0.wait_for_packet(1)
1042             self.logger.debug(ppp("Got packet:", p))
1043             ether = p[Ether]
1044             self.assert_equal(self.pg0.remote_mac,
1045                               ether.dst, "Destination MAC")
1046             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1047             ip = p[IP]
1048             self.assert_equal(self.pg0.remote_ip4, ip.dst, "Destination IP")
1049             self.assert_equal(self.pg0.remote_ip4, ip.src, "Destination IP")
1050             udp = p[UDP]
1051             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1052                               "UDP destination port")
1053             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1054             udp_sport_rx += 1
1055             # need to compare the hex payload here, otherwise BFD_vpp_echo
1056             # gets in way
1057             self.assertEqual(str(p[UDP].payload),
1058                              str(echo_packet[UDP].payload),
1059                              "Received packet is not the echo packet sent")
1060         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1061                           "ECHO packet identifier for test purposes)")
1062
1063     def test_echo(self):
1064         """ echo function """
1065         bfd_session_up(self)
1066         self.test_session.update(required_min_echo_rx=50000)
1067         self.test_session.send_packet()
1068         detection_time = self.test_session.detect_mult *\
1069             self.vpp_session.required_min_rx / USEC_IN_SEC
1070         # echo shouldn't work without echo source set
1071         for dummy in range(3):
1072             sleep = 0.75 * detection_time
1073             self.sleep(sleep, "delay before sending bfd packet")
1074             self.test_session.send_packet()
1075         p = wait_for_bfd_packet(
1076             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1077         self.assert_equal(p[BFD].required_min_rx_interval,
1078                           self.vpp_session.required_min_rx,
1079                           "BFD required min rx interval")
1080         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1081         # should be turned on - loopback echo packets
1082         for dummy in range(3):
1083             loop_until = time.time() + 0.75 * detection_time
1084             while time.time() < loop_until:
1085                 p = self.pg0.wait_for_packet(1)
1086                 self.logger.debug(ppp("Got packet:", p))
1087                 if p[UDP].dport == BFD.udp_dport_echo:
1088                     self.assert_equal(
1089                         p[IP].dst, self.pg0.local_ip4, "BFD ECHO dst IP")
1090                     self.assertNotEqual(p[IP].src, self.loopback0.local_ip4,
1091                                         "BFD ECHO src IP equal to loopback IP")
1092                     self.logger.debug(ppp("Looping back packet:", p))
1093                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1094                                       "ECHO packet destination MAC address")
1095                     p[Ether].dst = self.pg0.local_mac
1096                     self.pg0.add_stream(p)
1097                     self.pg_start()
1098                 elif p.haslayer(BFD):
1099                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1100                                             1000000)
1101                     if "P" in p.sprintf("%BFD.flags%"):
1102                         final = self.test_session.create_packet()
1103                         final[BFD].flags = "F"
1104                         self.test_session.send_packet(final)
1105                 else:
1106                     raise Exception(ppp("Received unknown packet:", p))
1107
1108                 self.assert_equal(len(self.vapi.collect_events()), 0,
1109                                   "number of bfd events")
1110             self.test_session.send_packet()
1111
1112     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1113     def test_echo_fail(self):
1114         """ session goes down if echo function fails """
1115         bfd_session_up(self)
1116         self.test_session.update(required_min_echo_rx=50000)
1117         self.test_session.send_packet()
1118         detection_time = self.test_session.detect_mult *\
1119             self.vpp_session.required_min_rx / USEC_IN_SEC
1120         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1121         # echo function should be used now, but we will drop the echo packets
1122         verified_diag = False
1123         for dummy in range(3):
1124             loop_until = time.time() + 0.75 * detection_time
1125             while time.time() < loop_until:
1126                 p = self.pg0.wait_for_packet(1)
1127                 self.logger.debug(ppp("Got packet:", p))
1128                 if p[UDP].dport == BFD.udp_dport_echo:
1129                     # dropped
1130                     pass
1131                 elif p.haslayer(BFD):
1132                     if "P" in p.sprintf("%BFD.flags%"):
1133                         self.assertGreaterEqual(
1134                             p[BFD].required_min_rx_interval,
1135                             1000000)
1136                         final = self.test_session.create_packet()
1137                         final[BFD].flags = "F"
1138                         self.test_session.send_packet(final)
1139                     if p[BFD].state == BFDState.down:
1140                         self.assert_equal(p[BFD].diag,
1141                                           BFDDiagCode.echo_function_failed,
1142                                           BFDDiagCode)
1143                         verified_diag = True
1144                 else:
1145                     raise Exception(ppp("Received unknown packet:", p))
1146             self.test_session.send_packet()
1147         events = self.vapi.collect_events()
1148         self.assert_equal(len(events), 1, "number of bfd events")
1149         self.assert_equal(events[0].state, BFDState.down, BFDState)
1150         self.assertTrue(verified_diag, "Incorrect diagnostics code received")
1151
1152     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1153     def test_echo_stop(self):
1154         """ echo function stops if peer sets required min echo rx zero """
1155         bfd_session_up(self)
1156         self.test_session.update(required_min_echo_rx=50000)
1157         self.test_session.send_packet()
1158         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1159         # wait for first echo packet
1160         while True:
1161             p = self.pg0.wait_for_packet(1)
1162             self.logger.debug(ppp("Got packet:", p))
1163             if p[UDP].dport == BFD.udp_dport_echo:
1164                 self.logger.debug(ppp("Looping back packet:", p))
1165                 p[Ether].dst = self.pg0.local_mac
1166                 self.pg0.add_stream(p)
1167                 self.pg_start()
1168                 break
1169             elif p.haslayer(BFD):
1170                 # ignore BFD
1171                 pass
1172             else:
1173                 raise Exception(ppp("Received unknown packet:", p))
1174         self.test_session.update(required_min_echo_rx=0)
1175         self.test_session.send_packet()
1176         # echo packets shouldn't arrive anymore
1177         for dummy in range(5):
1178             wait_for_bfd_packet(
1179                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1180             self.test_session.send_packet()
1181             events = self.vapi.collect_events()
1182             self.assert_equal(len(events), 0, "number of bfd events")
1183
1184     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1185     def test_echo_source_removed(self):
1186         """ echo function stops if echo source is removed """
1187         bfd_session_up(self)
1188         self.test_session.update(required_min_echo_rx=50000)
1189         self.test_session.send_packet()
1190         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1191         # wait for first echo packet
1192         while True:
1193             p = self.pg0.wait_for_packet(1)
1194             self.logger.debug(ppp("Got packet:", p))
1195             if p[UDP].dport == BFD.udp_dport_echo:
1196                 self.logger.debug(ppp("Looping back packet:", p))
1197                 p[Ether].dst = self.pg0.local_mac
1198                 self.pg0.add_stream(p)
1199                 self.pg_start()
1200                 break
1201             elif p.haslayer(BFD):
1202                 # ignore BFD
1203                 pass
1204             else:
1205                 raise Exception(ppp("Received unknown packet:", p))
1206         self.vapi.bfd_udp_del_echo_source()
1207         self.test_session.send_packet()
1208         # echo packets shouldn't arrive anymore
1209         for dummy in range(5):
1210             wait_for_bfd_packet(
1211                 self, pcap_time_min=time.time() - self.vpp_clock_offset)
1212             self.test_session.send_packet()
1213             events = self.vapi.collect_events()
1214             self.assert_equal(len(events), 0, "number of bfd events")
1215
1216     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1217     def test_stale_echo(self):
1218         """ stale echo packets don't keep a session up """
1219         bfd_session_up(self)
1220         self.test_session.update(required_min_echo_rx=50000)
1221         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1222         self.test_session.send_packet()
1223         # should be turned on - loopback echo packets
1224         echo_packet = None
1225         timeout_at = None
1226         timeout_ok = False
1227         for dummy in range(10 * self.vpp_session.detect_mult):
1228             p = self.pg0.wait_for_packet(1)
1229             if p[UDP].dport == BFD.udp_dport_echo:
1230                 if echo_packet is None:
1231                     self.logger.debug(ppp("Got first echo packet:", p))
1232                     echo_packet = p
1233                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1234                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1235                 else:
1236                     self.logger.debug(ppp("Got followup echo packet:", p))
1237                 self.logger.debug(ppp("Looping back first echo packet:", p))
1238                 echo_packet[Ether].dst = self.pg0.local_mac
1239                 self.pg0.add_stream(echo_packet)
1240                 self.pg_start()
1241             elif p.haslayer(BFD):
1242                 self.logger.debug(ppp("Got packet:", p))
1243                 if "P" in p.sprintf("%BFD.flags%"):
1244                     final = self.test_session.create_packet()
1245                     final[BFD].flags = "F"
1246                     self.test_session.send_packet(final)
1247                 if p[BFD].state == BFDState.down:
1248                     self.assertIsNotNone(
1249                         timeout_at,
1250                         "Session went down before first echo packet received")
1251                     now = time.time()
1252                     self.assertGreaterEqual(
1253                         now, timeout_at,
1254                         "Session timeout at %s, but is expected at %s" %
1255                         (now, timeout_at))
1256                     self.assert_equal(p[BFD].diag,
1257                                       BFDDiagCode.echo_function_failed,
1258                                       BFDDiagCode)
1259                     events = self.vapi.collect_events()
1260                     self.assert_equal(len(events), 1, "number of bfd events")
1261                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1262                     timeout_ok = True
1263                     break
1264             else:
1265                 raise Exception(ppp("Received unknown packet:", p))
1266             self.test_session.send_packet()
1267         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1268
1269     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1270     def test_invalid_echo_checksum(self):
1271         """ echo packets with invalid checksum don't keep a session up """
1272         bfd_session_up(self)
1273         self.test_session.update(required_min_echo_rx=50000)
1274         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1275         self.test_session.send_packet()
1276         # should be turned on - loopback echo packets
1277         timeout_at = None
1278         timeout_ok = False
1279         for dummy in range(10 * self.vpp_session.detect_mult):
1280             p = self.pg0.wait_for_packet(1)
1281             if p[UDP].dport == BFD.udp_dport_echo:
1282                 self.logger.debug(ppp("Got echo packet:", p))
1283                 if timeout_at is None:
1284                     timeout_at = time.time() + self.vpp_session.detect_mult * \
1285                         self.test_session.required_min_echo_rx / USEC_IN_SEC
1286                 p[BFD_vpp_echo].checksum = getrandbits(64)
1287                 p[Ether].dst = self.pg0.local_mac
1288                 self.logger.debug(ppp("Looping back modified echo packet:", p))
1289                 self.pg0.add_stream(p)
1290                 self.pg_start()
1291             elif p.haslayer(BFD):
1292                 self.logger.debug(ppp("Got packet:", p))
1293                 if "P" in p.sprintf("%BFD.flags%"):
1294                     final = self.test_session.create_packet()
1295                     final[BFD].flags = "F"
1296                     self.test_session.send_packet(final)
1297                 if p[BFD].state == BFDState.down:
1298                     self.assertIsNotNone(
1299                         timeout_at,
1300                         "Session went down before first echo packet received")
1301                     now = time.time()
1302                     self.assertGreaterEqual(
1303                         now, timeout_at,
1304                         "Session timeout at %s, but is expected at %s" %
1305                         (now, timeout_at))
1306                     self.assert_equal(p[BFD].diag,
1307                                       BFDDiagCode.echo_function_failed,
1308                                       BFDDiagCode)
1309                     events = self.vapi.collect_events()
1310                     self.assert_equal(len(events), 1, "number of bfd events")
1311                     self.assert_equal(events[0].state, BFDState.down, BFDState)
1312                     timeout_ok = True
1313                     break
1314             else:
1315                 raise Exception(ppp("Received unknown packet:", p))
1316             self.test_session.send_packet()
1317         self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
1318
1319     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1320     def test_admin_up_down(self):
1321         """ put session admin-up and admin-down """
1322         bfd_session_up(self)
1323         self.vpp_session.admin_down()
1324         self.pg0.enable_capture()
1325         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1326         verify_event(self, e, expected_state=BFDState.admin_down)
1327         for dummy in range(2):
1328             p = wait_for_bfd_packet(self)
1329             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1330         # try to bring session up - shouldn't be possible
1331         self.test_session.update(state=BFDState.init)
1332         self.test_session.send_packet()
1333         for dummy in range(2):
1334             p = wait_for_bfd_packet(self)
1335             self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
1336         self.vpp_session.admin_up()
1337         self.test_session.update(state=BFDState.down)
1338         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1339         verify_event(self, e, expected_state=BFDState.down)
1340         p = wait_for_bfd_packet(
1341             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1342         self.assert_equal(p[BFD].state, BFDState.down, BFDState)
1343         self.test_session.send_packet()
1344         p = wait_for_bfd_packet(
1345             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1346         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1347         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1348         verify_event(self, e, expected_state=BFDState.init)
1349         self.test_session.update(state=BFDState.up)
1350         self.test_session.send_packet()
1351         p = wait_for_bfd_packet(
1352             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1353         self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1354         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1355         verify_event(self, e, expected_state=BFDState.up)
1356
1357     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1358     def test_config_change_remote_demand(self):
1359         """ configuration change while peer in demand mode """
1360         bfd_session_up(self)
1361         demand = self.test_session.create_packet()
1362         demand[BFD].flags = "D"
1363         self.test_session.send_packet(demand)
1364         self.vpp_session.modify_parameters(
1365             required_min_rx=2 * self.vpp_session.required_min_rx)
1366         p = wait_for_bfd_packet(
1367             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1368         # poll bit must be set
1369         self.assertIn("P", p.sprintf("%BFD.flags%"), "Poll bit not set")
1370         # terminate poll sequence
1371         final = self.test_session.create_packet()
1372         final[BFD].flags = "D+F"
1373         self.test_session.send_packet(final)
1374         # vpp should be quiet now again
1375         transmit_time = 0.9 \
1376             * max(self.vpp_session.required_min_rx,
1377                   self.test_session.desired_min_tx) \
1378             / USEC_IN_SEC
1379         count = 0
1380         for dummy in range(self.test_session.detect_mult * 2):
1381             time.sleep(transmit_time)
1382             self.test_session.send_packet(demand)
1383             try:
1384                 p = wait_for_bfd_packet(self, timeout=0)
1385                 self.logger.error(ppp("Received unexpected packet:", p))
1386                 count += 1
1387             except CaptureTimeoutError:
1388                 pass
1389         events = self.vapi.collect_events()
1390         for e in events:
1391             self.logger.error("Received unexpected event: %s", e)
1392         self.assert_equal(count, 0, "number of packets received")
1393         self.assert_equal(len(events), 0, "number of events received")
1394
1395
1396 class BFD6TestCase(VppTestCase):
1397     """Bidirectional Forwarding Detection (BFD) (IPv6) """
1398
1399     pg0 = None
1400     vpp_clock_offset = None
1401     vpp_session = None
1402     test_session = None
1403
1404     @classmethod
1405     def setUpClass(cls):
1406         super(BFD6TestCase, cls).setUpClass()
1407         try:
1408             cls.create_pg_interfaces([0])
1409             cls.pg0.config_ip6()
1410             cls.pg0.configure_ipv6_neighbors()
1411             cls.pg0.admin_up()
1412             cls.pg0.resolve_ndp()
1413             cls.create_loopback_interfaces([0])
1414             cls.loopback0 = cls.lo_interfaces[0]
1415             cls.loopback0.config_ip6()
1416             cls.loopback0.admin_up()
1417
1418         except Exception:
1419             super(BFD6TestCase, cls).tearDownClass()
1420             raise
1421
1422     def setUp(self):
1423         super(BFD6TestCase, self).setUp()
1424         self.factory = AuthKeyFactory()
1425         self.vapi.want_bfd_events()
1426         self.pg0.enable_capture()
1427         try:
1428             self.vpp_session = VppBFDUDPSession(self, self.pg0,
1429                                                 self.pg0.remote_ip6,
1430                                                 af=AF_INET6)
1431             self.vpp_session.add_vpp_config()
1432             self.vpp_session.admin_up()
1433             self.test_session = BFDTestSession(self, self.pg0, AF_INET6)
1434             self.logger.debug(self.vapi.cli("show adj nbr"))
1435         except:
1436             self.vapi.want_bfd_events(enable_disable=0)
1437             raise
1438
1439     def tearDown(self):
1440         if not self.vpp_dead:
1441             self.vapi.want_bfd_events(enable_disable=0)
1442         self.vapi.collect_events()  # clear the event queue
1443         super(BFD6TestCase, self).tearDown()
1444
1445     def test_session_up(self):
1446         """ bring BFD session up """
1447         bfd_session_up(self)
1448
1449     def test_session_up_by_ip(self):
1450         """ bring BFD session up - first frame looked up by address pair """
1451         self.logger.info("BFD: Sending Slow control frame")
1452         self.test_session.update(my_discriminator=randint(0, 40000000))
1453         self.test_session.send_packet()
1454         self.pg0.enable_capture()
1455         p = self.pg0.wait_for_packet(1)
1456         self.assert_equal(p[BFD].your_discriminator,
1457                           self.test_session.my_discriminator,
1458                           "BFD - your discriminator")
1459         self.assert_equal(p[BFD].state, BFDState.init, BFDState)
1460         self.test_session.update(your_discriminator=p[BFD].my_discriminator,
1461                                  state=BFDState.up)
1462         self.logger.info("BFD: Waiting for event")
1463         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1464         verify_event(self, e, expected_state=BFDState.init)
1465         self.logger.info("BFD: Sending Up")
1466         self.test_session.send_packet()
1467         self.logger.info("BFD: Waiting for event")
1468         e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
1469         verify_event(self, e, expected_state=BFDState.up)
1470         self.logger.info("BFD: Session is Up")
1471         self.test_session.update(state=BFDState.up)
1472         self.test_session.send_packet()
1473         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1474
1475     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1476     def test_hold_up(self):
1477         """ hold BFD session up """
1478         bfd_session_up(self)
1479         for dummy in range(self.test_session.detect_mult * 2):
1480             wait_for_bfd_packet(self)
1481             self.test_session.send_packet()
1482         self.assert_equal(len(self.vapi.collect_events()), 0,
1483                           "number of bfd events")
1484         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1485
1486     def test_echo_looped_back(self):
1487         """ echo packets looped back """
1488         # don't need a session in this case..
1489         self.vpp_session.remove_vpp_config()
1490         self.pg0.enable_capture()
1491         echo_packet_count = 10
1492         # random source port low enough to increment a few times..
1493         udp_sport_tx = randint(1, 50000)
1494         udp_sport_rx = udp_sport_tx
1495         echo_packet = (Ether(src=self.pg0.remote_mac,
1496                              dst=self.pg0.local_mac) /
1497                        IPv6(src=self.pg0.remote_ip6,
1498                             dst=self.pg0.remote_ip6) /
1499                        UDP(dport=BFD.udp_dport_echo) /
1500                        Raw("this should be looped back"))
1501         for dummy in range(echo_packet_count):
1502             self.sleep(.01, "delay between echo packets")
1503             echo_packet[UDP].sport = udp_sport_tx
1504             udp_sport_tx += 1
1505             self.logger.debug(ppp("Sending packet:", echo_packet))
1506             self.pg0.add_stream(echo_packet)
1507             self.pg_start()
1508         for dummy in range(echo_packet_count):
1509             p = self.pg0.wait_for_packet(1)
1510             self.logger.debug(ppp("Got packet:", p))
1511             ether = p[Ether]
1512             self.assert_equal(self.pg0.remote_mac,
1513                               ether.dst, "Destination MAC")
1514             self.assert_equal(self.pg0.local_mac, ether.src, "Source MAC")
1515             ip = p[IPv6]
1516             self.assert_equal(self.pg0.remote_ip6, ip.dst, "Destination IP")
1517             self.assert_equal(self.pg0.remote_ip6, ip.src, "Destination IP")
1518             udp = p[UDP]
1519             self.assert_equal(udp.dport, BFD.udp_dport_echo,
1520                               "UDP destination port")
1521             self.assert_equal(udp.sport, udp_sport_rx, "UDP source port")
1522             udp_sport_rx += 1
1523             # need to compare the hex payload here, otherwise BFD_vpp_echo
1524             # gets in way
1525             self.assertEqual(str(p[UDP].payload),
1526                              str(echo_packet[UDP].payload),
1527                              "Received packet is not the echo packet sent")
1528         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1529                           "ECHO packet identifier for test purposes)")
1530         self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
1531                           "ECHO packet identifier for test purposes)")
1532
1533     def test_echo(self):
1534         """ echo function used """
1535         bfd_session_up(self)
1536         self.test_session.update(required_min_echo_rx=50000)
1537         self.test_session.send_packet()
1538         detection_time = self.test_session.detect_mult *\
1539             self.vpp_session.required_min_rx / USEC_IN_SEC
1540         # echo shouldn't work without echo source set
1541         for dummy in range(3):
1542             sleep = 0.75 * detection_time
1543             self.sleep(sleep, "delay before sending bfd packet")
1544             self.test_session.send_packet()
1545         p = wait_for_bfd_packet(
1546             self, pcap_time_min=time.time() - self.vpp_clock_offset)
1547         self.assert_equal(p[BFD].required_min_rx_interval,
1548                           self.vpp_session.required_min_rx,
1549                           "BFD required min rx interval")
1550         self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
1551         # should be turned on - loopback echo packets
1552         for dummy in range(3):
1553             loop_until = time.time() + 0.75 * detection_time
1554             while time.time() < loop_until:
1555                 p = self.pg0.wait_for_packet(1)
1556                 self.logger.debug(ppp("Got packet:", p))
1557                 if p[UDP].dport == BFD.udp_dport_echo:
1558                     self.assert_equal(
1559                         p[IPv6].dst, self.pg0.local_ip6, "BFD ECHO dst IP")
1560                     self.assertNotEqual(p[IPv6].src, self.loopback0.local_ip6,
1561                                         "BFD ECHO src IP equal to loopback IP")
1562                     self.logger.debug(ppp("Looping back packet:", p))
1563                     self.assert_equal(p[Ether].dst, self.pg0.remote_mac,
1564                                       "ECHO packet destination MAC address")
1565                     p[Ether].dst = self.pg0.local_mac
1566                     self.pg0.add_stream(p)
1567                     self.pg_start()
1568                 elif p.haslayer(BFD):
1569                     self.assertGreaterEqual(p[BFD].required_min_rx_interval,
1570                                             1000000)
1571                     if "P" in p.sprintf("%BFD.flags%"):
1572                         final = self.test_session.create_packet()
1573                         final[BFD].flags = "F"
1574                         self.test_session.send_packet(final)
1575                 else:
1576                     raise Exception(ppp("Received unknown packet:", p))
1577
1578                 self.assert_equal(len(self.vapi.collect_events()), 0,
1579                                   "number of bfd events")
1580             self.test_session.send_packet()
1581
1582
1583 class BFDSHA1TestCase(VppTestCase):
1584     """Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
1585
1586     pg0 = None
1587     vpp_clock_offset = None
1588     vpp_session = None
1589     test_session = None
1590
1591     @classmethod
1592     def setUpClass(cls):
1593         super(BFDSHA1TestCase, cls).setUpClass()
1594         try:
1595             cls.create_pg_interfaces([0])
1596             cls.pg0.config_ip4()
1597             cls.pg0.admin_up()
1598             cls.pg0.resolve_arp()
1599
1600         except Exception:
1601             super(BFDSHA1TestCase, cls).tearDownClass()
1602             raise
1603
1604     def setUp(self):
1605         super(BFDSHA1TestCase, self).setUp()
1606         self.factory = AuthKeyFactory()
1607         self.vapi.want_bfd_events()
1608         self.pg0.enable_capture()
1609
1610     def tearDown(self):
1611         if not self.vpp_dead:
1612             self.vapi.want_bfd_events(enable_disable=0)
1613         self.vapi.collect_events()  # clear the event queue
1614         super(BFDSHA1TestCase, self).tearDown()
1615
1616     def test_session_up(self):
1617         """ bring BFD session up """
1618         key = self.factory.create_random_key(self)
1619         key.add_vpp_config()
1620         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1621                                             self.pg0.remote_ip4,
1622                                             sha1_key=key)
1623         self.vpp_session.add_vpp_config()
1624         self.vpp_session.admin_up()
1625         self.test_session = BFDTestSession(
1626             self, self.pg0, AF_INET, sha1_key=key,
1627             bfd_key_id=self.vpp_session.bfd_key_id)
1628         bfd_session_up(self)
1629
1630     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1631     def test_hold_up(self):
1632         """ hold BFD session up """
1633         key = self.factory.create_random_key(self)
1634         key.add_vpp_config()
1635         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1636                                             self.pg0.remote_ip4,
1637                                             sha1_key=key)
1638         self.vpp_session.add_vpp_config()
1639         self.vpp_session.admin_up()
1640         self.test_session = BFDTestSession(
1641             self, self.pg0, AF_INET, sha1_key=key,
1642             bfd_key_id=self.vpp_session.bfd_key_id)
1643         bfd_session_up(self)
1644         for dummy in range(self.test_session.detect_mult * 2):
1645             wait_for_bfd_packet(self)
1646             self.test_session.send_packet()
1647         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1648
1649     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1650     def test_hold_up_meticulous(self):
1651         """ hold BFD session up - meticulous auth """
1652         key = self.factory.create_random_key(
1653             self, BFDAuthType.meticulous_keyed_sha1)
1654         key.add_vpp_config()
1655         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1656                                             self.pg0.remote_ip4, sha1_key=key)
1657         self.vpp_session.add_vpp_config()
1658         self.vpp_session.admin_up()
1659         # specify sequence number so that it wraps
1660         self.test_session = BFDTestSession(
1661             self, self.pg0, AF_INET, sha1_key=key,
1662             bfd_key_id=self.vpp_session.bfd_key_id,
1663             our_seq_number=0xFFFFFFFF - 4)
1664         bfd_session_up(self)
1665         for dummy in range(30):
1666             wait_for_bfd_packet(self)
1667             self.test_session.inc_seq_num()
1668             self.test_session.send_packet()
1669         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1670
1671     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1672     def test_send_bad_seq_number(self):
1673         """ session is not kept alive by msgs with bad sequence numbers"""
1674         key = self.factory.create_random_key(
1675             self, BFDAuthType.meticulous_keyed_sha1)
1676         key.add_vpp_config()
1677         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1678                                             self.pg0.remote_ip4, sha1_key=key)
1679         self.vpp_session.add_vpp_config()
1680         self.test_session = BFDTestSession(
1681             self, self.pg0, AF_INET, sha1_key=key,
1682             bfd_key_id=self.vpp_session.bfd_key_id)
1683         bfd_session_up(self)
1684         detection_time = self.test_session.detect_mult *\
1685             self.vpp_session.required_min_rx / USEC_IN_SEC
1686         send_until = time.time() + 2 * detection_time
1687         while time.time() < send_until:
1688             self.test_session.send_packet()
1689             self.sleep(0.7 * self.vpp_session.required_min_rx / USEC_IN_SEC,
1690                        "time between bfd packets")
1691         e = self.vapi.collect_events()
1692         # session should be down now, because the sequence numbers weren't
1693         # updated
1694         self.assert_equal(len(e), 1, "number of bfd events")
1695         verify_event(self, e[0], expected_state=BFDState.down)
1696
1697     def execute_rogue_session_scenario(self, vpp_bfd_udp_session,
1698                                        legitimate_test_session,
1699                                        rogue_test_session,
1700                                        rogue_bfd_values=None):
1701         """ execute a rogue session interaction scenario
1702
1703         1. create vpp session, add config
1704         2. bring the legitimate session up
1705         3. copy the bfd values from legitimate session to rogue session
1706         4. apply rogue_bfd_values to rogue session
1707         5. set rogue session state to down
1708         6. send message to take the session down from the rogue session
1709         7. assert that the legitimate session is unaffected
1710         """
1711
1712         self.vpp_session = vpp_bfd_udp_session
1713         self.vpp_session.add_vpp_config()
1714         self.test_session = legitimate_test_session
1715         # bring vpp session up
1716         bfd_session_up(self)
1717         # send packet from rogue session
1718         rogue_test_session.update(
1719             my_discriminator=self.test_session.my_discriminator,
1720             your_discriminator=self.test_session.your_discriminator,
1721             desired_min_tx=self.test_session.desired_min_tx,
1722             required_min_rx=self.test_session.required_min_rx,
1723             detect_mult=self.test_session.detect_mult,
1724             diag=self.test_session.diag,
1725             state=self.test_session.state,
1726             auth_type=self.test_session.auth_type)
1727         if rogue_bfd_values:
1728             rogue_test_session.update(**rogue_bfd_values)
1729         rogue_test_session.update(state=BFDState.down)
1730         rogue_test_session.send_packet()
1731         wait_for_bfd_packet(self)
1732         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1733
1734     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1735     def test_mismatch_auth(self):
1736         """ session is not brought down by unauthenticated msg """
1737         key = self.factory.create_random_key(self)
1738         key.add_vpp_config()
1739         vpp_session = VppBFDUDPSession(
1740             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1741         legitimate_test_session = BFDTestSession(
1742             self, self.pg0, AF_INET, sha1_key=key,
1743             bfd_key_id=vpp_session.bfd_key_id)
1744         rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
1745         self.execute_rogue_session_scenario(vpp_session,
1746                                             legitimate_test_session,
1747                                             rogue_test_session)
1748
1749     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1750     def test_mismatch_bfd_key_id(self):
1751         """ session is not brought down by msg with non-existent key-id """
1752         key = self.factory.create_random_key(self)
1753         key.add_vpp_config()
1754         vpp_session = VppBFDUDPSession(
1755             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1756         # pick a different random bfd key id
1757         x = randint(0, 255)
1758         while x == vpp_session.bfd_key_id:
1759             x = randint(0, 255)
1760         legitimate_test_session = BFDTestSession(
1761             self, self.pg0, AF_INET, sha1_key=key,
1762             bfd_key_id=vpp_session.bfd_key_id)
1763         rogue_test_session = BFDTestSession(
1764             self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
1765         self.execute_rogue_session_scenario(vpp_session,
1766                                             legitimate_test_session,
1767                                             rogue_test_session)
1768
1769     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1770     def test_mismatched_auth_type(self):
1771         """ session is not brought down by msg with wrong auth type """
1772         key = self.factory.create_random_key(self)
1773         key.add_vpp_config()
1774         vpp_session = VppBFDUDPSession(
1775             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
1776         legitimate_test_session = BFDTestSession(
1777             self, self.pg0, AF_INET, sha1_key=key,
1778             bfd_key_id=vpp_session.bfd_key_id)
1779         rogue_test_session = BFDTestSession(
1780             self, self.pg0, AF_INET, sha1_key=key,
1781             bfd_key_id=vpp_session.bfd_key_id)
1782         self.execute_rogue_session_scenario(
1783             vpp_session, legitimate_test_session, rogue_test_session,
1784             {'auth_type': BFDAuthType.keyed_md5})
1785
1786     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1787     def test_restart(self):
1788         """ simulate remote peer restart and resynchronization """
1789         key = self.factory.create_random_key(
1790             self, BFDAuthType.meticulous_keyed_sha1)
1791         key.add_vpp_config()
1792         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1793                                             self.pg0.remote_ip4, sha1_key=key)
1794         self.vpp_session.add_vpp_config()
1795         self.test_session = BFDTestSession(
1796             self, self.pg0, AF_INET, sha1_key=key,
1797             bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
1798         bfd_session_up(self)
1799         # don't send any packets for 2*detection_time
1800         detection_time = self.test_session.detect_mult *\
1801             self.vpp_session.required_min_rx / USEC_IN_SEC
1802         self.sleep(2 * detection_time, "simulating peer restart")
1803         events = self.vapi.collect_events()
1804         self.assert_equal(len(events), 1, "number of bfd events")
1805         verify_event(self, events[0], expected_state=BFDState.down)
1806         self.test_session.update(state=BFDState.down)
1807         # reset sequence number
1808         self.test_session.our_seq_number = 0
1809         self.test_session.vpp_seq_number = None
1810         # now throw away any pending packets
1811         self.pg0.enable_capture()
1812         bfd_session_up(self)
1813
1814
1815 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1816 class BFDAuthOnOffTestCase(VppTestCase):
1817     """Bidirectional Forwarding Detection (BFD) (changing auth) """
1818
1819     pg0 = None
1820     vpp_session = None
1821     test_session = None
1822
1823     @classmethod
1824     def setUpClass(cls):
1825         super(BFDAuthOnOffTestCase, cls).setUpClass()
1826         try:
1827             cls.create_pg_interfaces([0])
1828             cls.pg0.config_ip4()
1829             cls.pg0.admin_up()
1830             cls.pg0.resolve_arp()
1831
1832         except Exception:
1833             super(BFDAuthOnOffTestCase, cls).tearDownClass()
1834             raise
1835
1836     def setUp(self):
1837         super(BFDAuthOnOffTestCase, self).setUp()
1838         self.factory = AuthKeyFactory()
1839         self.vapi.want_bfd_events()
1840         self.pg0.enable_capture()
1841
1842     def tearDown(self):
1843         if not self.vpp_dead:
1844             self.vapi.want_bfd_events(enable_disable=0)
1845         self.vapi.collect_events()  # clear the event queue
1846         super(BFDAuthOnOffTestCase, self).tearDown()
1847
1848     def test_auth_on_immediate(self):
1849         """ turn auth on without disturbing session state (immediate) """
1850         key = self.factory.create_random_key(self)
1851         key.add_vpp_config()
1852         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1853                                             self.pg0.remote_ip4)
1854         self.vpp_session.add_vpp_config()
1855         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1856         bfd_session_up(self)
1857         for dummy in range(self.test_session.detect_mult * 2):
1858             p = wait_for_bfd_packet(self)
1859             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1860             self.test_session.send_packet()
1861         self.vpp_session.activate_auth(key)
1862         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1863         self.test_session.sha1_key = key
1864         for dummy in range(self.test_session.detect_mult * 2):
1865             p = wait_for_bfd_packet(self)
1866             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1867             self.test_session.send_packet()
1868         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1869         self.assert_equal(len(self.vapi.collect_events()), 0,
1870                           "number of bfd events")
1871
1872     def test_auth_off_immediate(self):
1873         """ turn auth off without disturbing session state (immediate) """
1874         key = self.factory.create_random_key(self)
1875         key.add_vpp_config()
1876         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1877                                             self.pg0.remote_ip4, sha1_key=key)
1878         self.vpp_session.add_vpp_config()
1879         self.test_session = BFDTestSession(
1880             self, self.pg0, AF_INET, sha1_key=key,
1881             bfd_key_id=self.vpp_session.bfd_key_id)
1882         bfd_session_up(self)
1883         # self.vapi.want_bfd_events(enable_disable=0)
1884         for dummy in range(self.test_session.detect_mult * 2):
1885             p = wait_for_bfd_packet(self)
1886             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1887             self.test_session.inc_seq_num()
1888             self.test_session.send_packet()
1889         self.vpp_session.deactivate_auth()
1890         self.test_session.bfd_key_id = None
1891         self.test_session.sha1_key = None
1892         for dummy in range(self.test_session.detect_mult * 2):
1893             p = wait_for_bfd_packet(self)
1894             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1895             self.test_session.inc_seq_num()
1896             self.test_session.send_packet()
1897         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1898         self.assert_equal(len(self.vapi.collect_events()), 0,
1899                           "number of bfd events")
1900
1901     def test_auth_change_key_immediate(self):
1902         """ change auth key without disturbing session state (immediate) """
1903         key1 = self.factory.create_random_key(self)
1904         key1.add_vpp_config()
1905         key2 = self.factory.create_random_key(self)
1906         key2.add_vpp_config()
1907         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1908                                             self.pg0.remote_ip4, sha1_key=key1)
1909         self.vpp_session.add_vpp_config()
1910         self.test_session = BFDTestSession(
1911             self, self.pg0, AF_INET, sha1_key=key1,
1912             bfd_key_id=self.vpp_session.bfd_key_id)
1913         bfd_session_up(self)
1914         for dummy in range(self.test_session.detect_mult * 2):
1915             p = wait_for_bfd_packet(self)
1916             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1917             self.test_session.send_packet()
1918         self.vpp_session.activate_auth(key2)
1919         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1920         self.test_session.sha1_key = key2
1921         for dummy in range(self.test_session.detect_mult * 2):
1922             p = wait_for_bfd_packet(self)
1923             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1924             self.test_session.send_packet()
1925         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1926         self.assert_equal(len(self.vapi.collect_events()), 0,
1927                           "number of bfd events")
1928
1929     def test_auth_on_delayed(self):
1930         """ turn auth on without disturbing session state (delayed) """
1931         key = self.factory.create_random_key(self)
1932         key.add_vpp_config()
1933         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1934                                             self.pg0.remote_ip4)
1935         self.vpp_session.add_vpp_config()
1936         self.test_session = BFDTestSession(self, self.pg0, AF_INET)
1937         bfd_session_up(self)
1938         for dummy in range(self.test_session.detect_mult * 2):
1939             wait_for_bfd_packet(self)
1940             self.test_session.send_packet()
1941         self.vpp_session.activate_auth(key, delayed=True)
1942         for dummy in range(self.test_session.detect_mult * 2):
1943             p = wait_for_bfd_packet(self)
1944             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1945             self.test_session.send_packet()
1946         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
1947         self.test_session.sha1_key = key
1948         self.test_session.send_packet()
1949         for dummy in range(self.test_session.detect_mult * 2):
1950             p = wait_for_bfd_packet(self)
1951             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1952             self.test_session.send_packet()
1953         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1954         self.assert_equal(len(self.vapi.collect_events()), 0,
1955                           "number of bfd events")
1956
1957     def test_auth_off_delayed(self):
1958         """ turn auth off without disturbing session state (delayed) """
1959         key = self.factory.create_random_key(self)
1960         key.add_vpp_config()
1961         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1962                                             self.pg0.remote_ip4, sha1_key=key)
1963         self.vpp_session.add_vpp_config()
1964         self.test_session = BFDTestSession(
1965             self, self.pg0, AF_INET, sha1_key=key,
1966             bfd_key_id=self.vpp_session.bfd_key_id)
1967         bfd_session_up(self)
1968         for dummy in range(self.test_session.detect_mult * 2):
1969             p = wait_for_bfd_packet(self)
1970             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1971             self.test_session.send_packet()
1972         self.vpp_session.deactivate_auth(delayed=True)
1973         for dummy in range(self.test_session.detect_mult * 2):
1974             p = wait_for_bfd_packet(self)
1975             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1976             self.test_session.send_packet()
1977         self.test_session.bfd_key_id = None
1978         self.test_session.sha1_key = None
1979         self.test_session.send_packet()
1980         for dummy in range(self.test_session.detect_mult * 2):
1981             p = wait_for_bfd_packet(self)
1982             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
1983             self.test_session.send_packet()
1984         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
1985         self.assert_equal(len(self.vapi.collect_events()), 0,
1986                           "number of bfd events")
1987
1988     def test_auth_change_key_delayed(self):
1989         """ change auth key without disturbing session state (delayed) """
1990         key1 = self.factory.create_random_key(self)
1991         key1.add_vpp_config()
1992         key2 = self.factory.create_random_key(self)
1993         key2.add_vpp_config()
1994         self.vpp_session = VppBFDUDPSession(self, self.pg0,
1995                                             self.pg0.remote_ip4, sha1_key=key1)
1996         self.vpp_session.add_vpp_config()
1997         self.vpp_session.admin_up()
1998         self.test_session = BFDTestSession(
1999             self, self.pg0, AF_INET, sha1_key=key1,
2000             bfd_key_id=self.vpp_session.bfd_key_id)
2001         bfd_session_up(self)
2002         for dummy in range(self.test_session.detect_mult * 2):
2003             p = wait_for_bfd_packet(self)
2004             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2005             self.test_session.send_packet()
2006         self.vpp_session.activate_auth(key2, delayed=True)
2007         for dummy in range(self.test_session.detect_mult * 2):
2008             p = wait_for_bfd_packet(self)
2009             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2010             self.test_session.send_packet()
2011         self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
2012         self.test_session.sha1_key = key2
2013         self.test_session.send_packet()
2014         for dummy in range(self.test_session.detect_mult * 2):
2015             p = wait_for_bfd_packet(self)
2016             self.assert_equal(p[BFD].state, BFDState.up, BFDState)
2017             self.test_session.send_packet()
2018         self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
2019         self.assert_equal(len(self.vapi.collect_events()), 0,
2020                           "number of bfd events")
2021
2022
2023 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2024 class BFDCLITestCase(VppTestCase):
2025     """Bidirectional Forwarding Detection (BFD) (CLI) """
2026     pg0 = None
2027
2028     @classmethod
2029     def setUpClass(cls):
2030         super(BFDCLITestCase, cls).setUpClass()
2031
2032         try:
2033             cls.create_pg_interfaces((0,))
2034             cls.pg0.config_ip4()
2035             cls.pg0.config_ip6()
2036             cls.pg0.resolve_arp()
2037             cls.pg0.resolve_ndp()
2038
2039         except Exception:
2040             super(BFDCLITestCase, cls).tearDownClass()
2041             raise
2042
2043     def setUp(self):
2044         super(BFDCLITestCase, self).setUp()
2045         self.factory = AuthKeyFactory()
2046         self.pg0.enable_capture()
2047
2048     def tearDown(self):
2049         try:
2050             self.vapi.want_bfd_events(enable_disable=0)
2051         except UnexpectedApiReturnValueError:
2052             # some tests aren't subscribed, so this is not an issue
2053             pass
2054         self.vapi.collect_events()  # clear the event queue
2055         super(BFDCLITestCase, self).tearDown()
2056
2057     def cli_verify_no_response(self, cli):
2058         """ execute a CLI, asserting that the response is empty """
2059         self.assert_equal(self.vapi.cli(cli),
2060                           "",
2061                           "CLI command response")
2062
2063     def cli_verify_response(self, cli, expected):
2064         """ execute a CLI, asserting that the response matches expectation """
2065         self.assert_equal(self.vapi.cli(cli).strip(),
2066                           expected,
2067                           "CLI command response")
2068
2069     def test_show(self):
2070         """ show commands """
2071         k1 = self.factory.create_random_key(self)
2072         k1.add_vpp_config()
2073         k2 = self.factory.create_random_key(
2074             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2075         k2.add_vpp_config()
2076         s1 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2077         s1.add_vpp_config()
2078         s2 = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2079                               sha1_key=k2)
2080         s2.add_vpp_config()
2081         self.logger.info(self.vapi.ppcli("show bfd keys"))
2082         self.logger.info(self.vapi.ppcli("show bfd sessions"))
2083         self.logger.info(self.vapi.ppcli("show bfd"))
2084
2085     def test_set_del_sha1_key(self):
2086         """ set/delete SHA1 auth key """
2087         k = self.factory.create_random_key(self)
2088         self.registry.register(k, self.logger)
2089         self.cli_verify_no_response(
2090             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2091             (k.conf_key_id,
2092                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2093         self.assertTrue(k.query_vpp_config())
2094         self.vpp_session = VppBFDUDPSession(
2095             self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
2096         self.vpp_session.add_vpp_config()
2097         self.test_session = \
2098             BFDTestSession(self, self.pg0, AF_INET, sha1_key=k,
2099                            bfd_key_id=self.vpp_session.bfd_key_id)
2100         self.vapi.want_bfd_events()
2101         bfd_session_up(self)
2102         bfd_session_down(self)
2103         # try to replace the secret for the key - should fail because the key
2104         # is in-use
2105         k2 = self.factory.create_random_key(self)
2106         self.cli_verify_response(
2107             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2108             (k.conf_key_id,
2109                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2110             "bfd key set: `bfd_auth_set_key' API call failed, "
2111             "rv=-103:BFD object in use")
2112         # manipulating the session using old secret should still work
2113         bfd_session_up(self)
2114         bfd_session_down(self)
2115         self.vpp_session.remove_vpp_config()
2116         self.cli_verify_no_response(
2117             "bfd key del conf-key-id %s" % k.conf_key_id)
2118         self.assertFalse(k.query_vpp_config())
2119
2120     def test_set_del_meticulous_sha1_key(self):
2121         """ set/delete meticulous SHA1 auth key """
2122         k = self.factory.create_random_key(
2123             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2124         self.registry.register(k, self.logger)
2125         self.cli_verify_no_response(
2126             "bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
2127             (k.conf_key_id,
2128                 "".join("{:02x}".format(ord(c)) for c in k.key)))
2129         self.assertTrue(k.query_vpp_config())
2130         self.vpp_session = VppBFDUDPSession(self, self.pg0,
2131                                             self.pg0.remote_ip6, af=AF_INET6,
2132                                             sha1_key=k)
2133         self.vpp_session.add_vpp_config()
2134         self.vpp_session.admin_up()
2135         self.test_session = \
2136             BFDTestSession(self, self.pg0, AF_INET6, sha1_key=k,
2137                            bfd_key_id=self.vpp_session.bfd_key_id)
2138         self.vapi.want_bfd_events()
2139         bfd_session_up(self)
2140         bfd_session_down(self)
2141         # try to replace the secret for the key - should fail because the key
2142         # is in-use
2143         k2 = self.factory.create_random_key(self)
2144         self.cli_verify_response(
2145             "bfd key set conf-key-id %s type keyed-sha1 secret %s" %
2146             (k.conf_key_id,
2147                 "".join("{:02x}".format(ord(c)) for c in k2.key)),
2148             "bfd key set: `bfd_auth_set_key' API call failed, "
2149             "rv=-103:BFD object in use")
2150         # manipulating the session using old secret should still work
2151         bfd_session_up(self)
2152         bfd_session_down(self)
2153         self.vpp_session.remove_vpp_config()
2154         self.cli_verify_no_response(
2155             "bfd key del conf-key-id %s" % k.conf_key_id)
2156         self.assertFalse(k.query_vpp_config())
2157
2158     def test_add_mod_del_bfd_udp(self):
2159         """ create/modify/delete IPv4 BFD UDP session """
2160         vpp_session = VppBFDUDPSession(
2161             self, self.pg0, self.pg0.remote_ip4)
2162         self.registry.register(vpp_session, self.logger)
2163         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2164             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2165             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip4,
2166                                 self.pg0.remote_ip4,
2167                                 vpp_session.desired_min_tx,
2168                                 vpp_session.required_min_rx,
2169                                 vpp_session.detect_mult)
2170         self.cli_verify_no_response(cli_add_cmd)
2171         # 2nd add should fail
2172         self.cli_verify_response(
2173             cli_add_cmd,
2174             "bfd udp session add: `bfd_add_add_session' API call"
2175             " failed, rv=-101:Duplicate BFD object")
2176         verify_bfd_session_config(self, vpp_session)
2177         mod_session = VppBFDUDPSession(
2178             self, self.pg0, self.pg0.remote_ip4,
2179             required_min_rx=2 * vpp_session.required_min_rx,
2180             desired_min_tx=3 * vpp_session.desired_min_tx,
2181             detect_mult=4 * vpp_session.detect_mult)
2182         self.cli_verify_no_response(
2183             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2184             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2185             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2186              mod_session.desired_min_tx, mod_session.required_min_rx,
2187              mod_session.detect_mult))
2188         verify_bfd_session_config(self, mod_session)
2189         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2190             "peer-addr %s" % (self.pg0.name,
2191                               self.pg0.local_ip4, self.pg0.remote_ip4)
2192         self.cli_verify_no_response(cli_del_cmd)
2193         # 2nd del is expected to fail
2194         self.cli_verify_response(
2195             cli_del_cmd, "bfd udp session del: `bfd_udp_del_session' API call"
2196             " failed, rv=-102:No such BFD object")
2197         self.assertFalse(vpp_session.query_vpp_config())
2198
2199     def test_add_mod_del_bfd_udp6(self):
2200         """ create/modify/delete IPv6 BFD UDP session """
2201         vpp_session = VppBFDUDPSession(
2202             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6)
2203         self.registry.register(vpp_session, self.logger)
2204         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2205             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2206             "detect-mult %s" % (self.pg0.name, self.pg0.local_ip6,
2207                                 self.pg0.remote_ip6,
2208                                 vpp_session.desired_min_tx,
2209                                 vpp_session.required_min_rx,
2210                                 vpp_session.detect_mult)
2211         self.cli_verify_no_response(cli_add_cmd)
2212         # 2nd add should fail
2213         self.cli_verify_response(
2214             cli_add_cmd,
2215             "bfd udp session add: `bfd_add_add_session' API call"
2216             " failed, rv=-101:Duplicate BFD object")
2217         verify_bfd_session_config(self, vpp_session)
2218         mod_session = VppBFDUDPSession(
2219             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6,
2220             required_min_rx=2 * vpp_session.required_min_rx,
2221             desired_min_tx=3 * vpp_session.desired_min_tx,
2222             detect_mult=4 * vpp_session.detect_mult)
2223         self.cli_verify_no_response(
2224             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2225             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2226             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2227              mod_session.desired_min_tx,
2228              mod_session.required_min_rx, mod_session.detect_mult))
2229         verify_bfd_session_config(self, mod_session)
2230         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2231             "peer-addr %s" % (self.pg0.name,
2232                               self.pg0.local_ip6, self.pg0.remote_ip6)
2233         self.cli_verify_no_response(cli_del_cmd)
2234         # 2nd del is expected to fail
2235         self.cli_verify_response(
2236             cli_del_cmd,
2237             "bfd udp session del: `bfd_udp_del_session' API call"
2238             " failed, rv=-102:No such BFD object")
2239         self.assertFalse(vpp_session.query_vpp_config())
2240
2241     def test_add_mod_del_bfd_udp_auth(self):
2242         """ create/modify/delete IPv4 BFD UDP session (authenticated) """
2243         key = self.factory.create_random_key(self)
2244         key.add_vpp_config()
2245         vpp_session = VppBFDUDPSession(
2246             self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
2247         self.registry.register(vpp_session, self.logger)
2248         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2249             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2250             "detect-mult %s conf-key-id %s bfd-key-id %s"\
2251             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2252                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2253                vpp_session.detect_mult, key.conf_key_id,
2254                vpp_session.bfd_key_id)
2255         self.cli_verify_no_response(cli_add_cmd)
2256         # 2nd add should fail
2257         self.cli_verify_response(
2258             cli_add_cmd,
2259             "bfd udp session add: `bfd_add_add_session' API call"
2260             " failed, rv=-101:Duplicate BFD object")
2261         verify_bfd_session_config(self, vpp_session)
2262         mod_session = VppBFDUDPSession(
2263             self, self.pg0, self.pg0.remote_ip4, sha1_key=key,
2264             bfd_key_id=vpp_session.bfd_key_id,
2265             required_min_rx=2 * vpp_session.required_min_rx,
2266             desired_min_tx=3 * vpp_session.desired_min_tx,
2267             detect_mult=4 * vpp_session.detect_mult)
2268         self.cli_verify_no_response(
2269             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2270             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2271             (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2272              mod_session.desired_min_tx,
2273              mod_session.required_min_rx, mod_session.detect_mult))
2274         verify_bfd_session_config(self, mod_session)
2275         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2276             "peer-addr %s" % (self.pg0.name,
2277                               self.pg0.local_ip4, self.pg0.remote_ip4)
2278         self.cli_verify_no_response(cli_del_cmd)
2279         # 2nd del is expected to fail
2280         self.cli_verify_response(
2281             cli_del_cmd,
2282             "bfd udp session del: `bfd_udp_del_session' API call"
2283             " failed, rv=-102:No such BFD object")
2284         self.assertFalse(vpp_session.query_vpp_config())
2285
2286     def test_add_mod_del_bfd_udp6_auth(self):
2287         """ create/modify/delete IPv6 BFD UDP session (authenticated) """
2288         key = self.factory.create_random_key(
2289             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2290         key.add_vpp_config()
2291         vpp_session = VppBFDUDPSession(
2292             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key)
2293         self.registry.register(vpp_session, self.logger)
2294         cli_add_cmd = "bfd udp session add interface %s local-addr %s " \
2295             "peer-addr %s desired-min-tx %s required-min-rx %s "\
2296             "detect-mult %s conf-key-id %s bfd-key-id %s" \
2297             % (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2298                vpp_session.desired_min_tx, vpp_session.required_min_rx,
2299                vpp_session.detect_mult, key.conf_key_id,
2300                vpp_session.bfd_key_id)
2301         self.cli_verify_no_response(cli_add_cmd)
2302         # 2nd add should fail
2303         self.cli_verify_response(
2304             cli_add_cmd,
2305             "bfd udp session add: `bfd_add_add_session' API call"
2306             " failed, rv=-101:Duplicate BFD object")
2307         verify_bfd_session_config(self, vpp_session)
2308         mod_session = VppBFDUDPSession(
2309             self, self.pg0, self.pg0.remote_ip6, af=AF_INET6, sha1_key=key,
2310             bfd_key_id=vpp_session.bfd_key_id,
2311             required_min_rx=2 * vpp_session.required_min_rx,
2312             desired_min_tx=3 * vpp_session.desired_min_tx,
2313             detect_mult=4 * vpp_session.detect_mult)
2314         self.cli_verify_no_response(
2315             "bfd udp session mod interface %s local-addr %s peer-addr %s "
2316             "desired-min-tx %s required-min-rx %s detect-mult %s" %
2317             (self.pg0.name, self.pg0.local_ip6, self.pg0.remote_ip6,
2318              mod_session.desired_min_tx,
2319              mod_session.required_min_rx, mod_session.detect_mult))
2320         verify_bfd_session_config(self, mod_session)
2321         cli_del_cmd = "bfd udp session del interface %s local-addr %s "\
2322             "peer-addr %s" % (self.pg0.name,
2323                               self.pg0.local_ip6, self.pg0.remote_ip6)
2324         self.cli_verify_no_response(cli_del_cmd)
2325         # 2nd del is expected to fail
2326         self.cli_verify_response(
2327             cli_del_cmd,
2328             "bfd udp session del: `bfd_udp_del_session' API call"
2329             " failed, rv=-102:No such BFD object")
2330         self.assertFalse(vpp_session.query_vpp_config())
2331
2332     def test_auth_on_off(self):
2333         """ turn authentication on and off """
2334         key = self.factory.create_random_key(
2335             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2336         key.add_vpp_config()
2337         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2338         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2339                                         sha1_key=key)
2340         session.add_vpp_config()
2341         cli_activate = \
2342             "bfd udp session auth activate interface %s local-addr %s "\
2343             "peer-addr %s conf-key-id %s bfd-key-id %s"\
2344             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2345                key.conf_key_id, auth_session.bfd_key_id)
2346         self.cli_verify_no_response(cli_activate)
2347         verify_bfd_session_config(self, auth_session)
2348         self.cli_verify_no_response(cli_activate)
2349         verify_bfd_session_config(self, auth_session)
2350         cli_deactivate = \
2351             "bfd udp session auth deactivate interface %s local-addr %s "\
2352             "peer-addr %s "\
2353             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2354         self.cli_verify_no_response(cli_deactivate)
2355         verify_bfd_session_config(self, session)
2356         self.cli_verify_no_response(cli_deactivate)
2357         verify_bfd_session_config(self, session)
2358
2359     def test_auth_on_off_delayed(self):
2360         """ turn authentication on and off (delayed) """
2361         key = self.factory.create_random_key(
2362             self, auth_type=BFDAuthType.meticulous_keyed_sha1)
2363         key.add_vpp_config()
2364         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2365         auth_session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4,
2366                                         sha1_key=key)
2367         session.add_vpp_config()
2368         cli_activate = \
2369             "bfd udp session auth activate interface %s local-addr %s "\
2370             "peer-addr %s conf-key-id %s bfd-key-id %s delayed yes"\
2371             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4,
2372                key.conf_key_id, auth_session.bfd_key_id)
2373         self.cli_verify_no_response(cli_activate)
2374         verify_bfd_session_config(self, auth_session)
2375         self.cli_verify_no_response(cli_activate)
2376         verify_bfd_session_config(self, auth_session)
2377         cli_deactivate = \
2378             "bfd udp session auth deactivate interface %s local-addr %s "\
2379             "peer-addr %s delayed yes"\
2380             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2381         self.cli_verify_no_response(cli_deactivate)
2382         verify_bfd_session_config(self, session)
2383         self.cli_verify_no_response(cli_deactivate)
2384         verify_bfd_session_config(self, session)
2385
2386     def test_admin_up_down(self):
2387         """ put session admin-up and admin-down """
2388         session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
2389         session.add_vpp_config()
2390         cli_down = \
2391             "bfd udp session set-flags admin down interface %s local-addr %s "\
2392             "peer-addr %s "\
2393             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2394         cli_up = \
2395             "bfd udp session set-flags admin up interface %s local-addr %s "\
2396             "peer-addr %s "\
2397             % (self.pg0.name, self.pg0.local_ip4, self.pg0.remote_ip4)
2398         self.cli_verify_no_response(cli_down)
2399         verify_bfd_session_config(self, session, state=BFDState.admin_down)
2400         self.cli_verify_no_response(cli_up)
2401         verify_bfd_session_config(self, session, state=BFDState.down)
2402
2403     def test_set_del_udp_echo_source(self):
2404         """ set/del udp echo source """
2405         self.create_loopback_interfaces([0])
2406         self.loopback0 = self.lo_interfaces[0]
2407         self.loopback0.admin_up()
2408         self.cli_verify_response("show bfd echo-source",
2409                                  "UDP echo source is not set.")
2410         cli_set = "bfd udp echo-source set interface %s" % self.loopback0.name
2411         self.cli_verify_no_response(cli_set)
2412         self.cli_verify_response("show bfd echo-source",
2413                                  "UDP echo source is: %s\n"
2414                                  "IPv4 address usable as echo source: none\n"
2415                                  "IPv6 address usable as echo source: none" %
2416                                  self.loopback0.name)
2417         self.loopback0.config_ip4()
2418         unpacked = unpack("!L", self.loopback0.local_ip4n)
2419         echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
2420         self.cli_verify_response("show bfd echo-source",
2421                                  "UDP echo source is: %s\n"
2422                                  "IPv4 address usable as echo source: %s\n"
2423                                  "IPv6 address usable as echo source: none" %
2424                                  (self.loopback0.name, echo_ip4))
2425         unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
2426         echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
2427                                             unpacked[2], unpacked[3] ^ 1))
2428         self.loopback0.config_ip6()
2429         self.cli_verify_response("show bfd echo-source",
2430                                  "UDP echo source is: %s\n"
2431                                  "IPv4 address usable as echo source: %s\n"
2432                                  "IPv6 address usable as echo source: %s" %
2433                                  (self.loopback0.name, echo_ip4, echo_ip6))
2434         cli_del = "bfd udp echo-source del"
2435         self.cli_verify_no_response(cli_del)
2436         self.cli_verify_response("show bfd echo-source",
2437                                  "UDP echo source is not set.")
2438
2439 if __name__ == '__main__':
2440     unittest.main(testRunner=VppTestRunner)