tests: Remove the unrequired VPP IP address/prefix class wrappers
[vpp.git] / test / bfd.py
1 """ BFD protocol implementation """
2
3 from random import randint
4 from socket import AF_INET, AF_INET6, inet_pton
5 from scapy.all import bind_layers
6 from scapy.layers.inet import UDP
7 from scapy.packet import Packet
8 from scapy.fields import BitField, BitEnumField, XByteField, FlagsField,\
9     ConditionalField, StrField
10 from vpp_object import VppObject
11 from util import NumericConstant
12 from vpp_papi import VppEnum
13
14
15 class BFDDiagCode(NumericConstant):
16     """ BFD Diagnostic Code """
17     no_diagnostic = 0
18     control_detection_time_expired = 1
19     echo_function_failed = 2
20     neighbor_signaled_session_down = 3
21     forwarding_plane_reset = 4
22     path_down = 5
23     concatenated_path_down = 6
24     administratively_down = 7
25     reverse_concatenated_path_down = 8
26
27     desc_dict = {
28         no_diagnostic: "No diagnostic",
29         control_detection_time_expired: "Control Detection Time Expired",
30         echo_function_failed: "Echo Function Failed",
31         neighbor_signaled_session_down: "Neighbor Signaled Session Down",
32         forwarding_plane_reset: "Forwarding Plane Reset",
33         path_down: "Path Down",
34         concatenated_path_down: "Concatenated Path Down",
35         administratively_down: "Administratively Down",
36         reverse_concatenated_path_down: "Reverse Concatenated Path Down",
37     }
38
39
40 class BFDState(NumericConstant):
41     """ BFD State """
42     admin_down = 0
43     down = 1
44     init = 2
45     up = 3
46
47     desc_dict = {
48         admin_down: "AdminDown",
49         down: "Down",
50         init: "Init",
51         up: "Up",
52     }
53
54
55 class BFDAuthType(NumericConstant):
56     """ BFD Authentication Type """
57     no_auth = 0
58     simple_pwd = 1
59     keyed_md5 = 2
60     meticulous_keyed_md5 = 3
61     keyed_sha1 = 4
62     meticulous_keyed_sha1 = 5
63
64     desc_dict = {
65         no_auth: "No authentication",
66         simple_pwd: "Simple Password",
67         keyed_md5: "Keyed MD5",
68         meticulous_keyed_md5: "Meticulous Keyed MD5",
69         keyed_sha1: "Keyed SHA1",
70         meticulous_keyed_sha1: "Meticulous Keyed SHA1",
71     }
72
73
74 def bfd_is_auth_used(pkt):
75     """ is packet authenticated? """
76     return "A" in pkt.sprintf("%BFD.flags%")
77
78
79 def bfd_is_simple_pwd_used(pkt):
80     """ is simple password authentication used? """
81     return bfd_is_auth_used(pkt) and pkt.auth_type == BFDAuthType.simple_pwd
82
83
84 def bfd_is_sha1_used(pkt):
85     """ is sha1 authentication used? """
86     return bfd_is_auth_used(pkt) and pkt.auth_type in \
87         (BFDAuthType.keyed_sha1, BFDAuthType.meticulous_keyed_sha1)
88
89
90 def bfd_is_md5_used(pkt):
91     """ is md5 authentication used? """
92     return bfd_is_auth_used(pkt) and pkt.auth_type in \
93         (BFDAuthType.keyed_md5, BFDAuthType.meticulous_keyed_md5)
94
95
96 def bfd_is_md5_or_sha1_used(pkt):
97     """ is md5 or sha1 used? """
98     return bfd_is_md5_used(pkt) or bfd_is_sha1_used(pkt)
99
100
101 class BFD(Packet):
102     """ BFD protocol layer for scapy """
103
104     udp_dport = 3784  #: BFD destination port per RFC 5881
105     udp_dport_echo = 3785  # : BFD destination port for ECHO per RFC 5881
106     udp_sport_min = 49152  #: BFD source port min value per RFC 5881
107     udp_sport_max = 65535  #: BFD source port max value per RFC 5881
108     bfd_pkt_len = 24  # : length of BFD pkt without authentication section
109     sha1_auth_len = 28  # : length of authentication section if SHA1 used
110
111     name = "BFD"
112
113     fields_desc = [
114         BitField("version", 1, 3),
115         BitEnumField("diag", 0, 5, BFDDiagCode.desc_dict),
116         BitEnumField("state", 0, 2, BFDState.desc_dict),
117         FlagsField("flags", 0, 6, ['M', 'D', 'A', 'C', 'F', 'P']),
118         XByteField("detect_mult", 0),
119         BitField("length", bfd_pkt_len, 8),
120         BitField("my_discriminator", 0, 32),
121         BitField("your_discriminator", 0, 32),
122         BitField("desired_min_tx_interval", 0, 32),
123         BitField("required_min_rx_interval", 0, 32),
124         BitField("required_min_echo_rx_interval", 0, 32),
125         ConditionalField(
126             BitEnumField("auth_type", 0, 8, BFDAuthType.desc_dict),
127             bfd_is_auth_used),
128         ConditionalField(BitField("auth_len", 0, 8), bfd_is_auth_used),
129         ConditionalField(BitField("auth_key_id", 0, 8), bfd_is_auth_used),
130         ConditionalField(BitField("auth_reserved", 0, 8),
131                          bfd_is_md5_or_sha1_used),
132         ConditionalField(
133             BitField("auth_seq_num", 0, 32), bfd_is_md5_or_sha1_used),
134         ConditionalField(StrField("auth_key_hash", "0" * 16), bfd_is_md5_used),
135         ConditionalField(
136             StrField("auth_key_hash", "0" * 20), bfd_is_sha1_used),
137     ]
138
139     def mysummary(self):
140         return self.sprintf("BFD(my_disc=%BFD.my_discriminator%,"
141                             "your_disc=%BFD.your_discriminator%)")
142
143
144 # glue the BFD packet class to scapy parser
145 bind_layers(UDP, BFD, dport=BFD.udp_dport)
146
147
148 class BFD_vpp_echo(Packet):
149     """ BFD echo packet as used by VPP (non-rfc, as rfc doesn't define one) """
150
151     udp_dport = 3785  #: BFD echo destination port per RFC 5881
152     name = "BFD_VPP_ECHO"
153
154     fields_desc = [
155         BitField("discriminator", 0, 32),
156         BitField("expire_time_clocks", 0, 64),
157         BitField("checksum", 0, 64)
158     ]
159
160     def mysummary(self):
161         return self.sprintf(
162             "BFD_VPP_ECHO(disc=%BFD_VPP_ECHO.discriminator%,"
163             "expire_time_clocks=%BFD_VPP_ECHO.expire_time_clocks%)")
164
165
166 # glue the BFD echo packet class to scapy parser
167 bind_layers(UDP, BFD_vpp_echo, dport=BFD_vpp_echo.udp_dport)
168
169
170 class VppBFDAuthKey(VppObject):
171     """ Represents BFD authentication key in VPP """
172
173     def __init__(self, test, conf_key_id, auth_type, key):
174         self._test = test
175         self._key = key
176         self._auth_type = auth_type
177         test.assertIn(auth_type, BFDAuthType.desc_dict)
178         self._conf_key_id = conf_key_id
179
180     @property
181     def test(self):
182         """ Test which created this key """
183         return self._test
184
185     @property
186     def auth_type(self):
187         """ Authentication type for this key """
188         return self._auth_type
189
190     @property
191     def key(self):
192         """ key data """
193         return self._key
194
195     @key.setter
196     def key(self, value):
197         self._key = value
198
199     @property
200     def conf_key_id(self):
201         """ configuration key ID """
202         return self._conf_key_id
203
204     def add_vpp_config(self):
205         self.test.vapi.bfd_auth_set_key(
206             conf_key_id=self._conf_key_id, auth_type=self._auth_type,
207             key=self._key, key_len=len(self._key))
208         self._test.registry.register(self, self.test.logger)
209
210     def get_bfd_auth_keys_dump_entry(self):
211         """ get the entry in the auth keys dump corresponding to this key """
212         result = self.test.vapi.bfd_auth_keys_dump()
213         for k in result:
214             if k.conf_key_id == self._conf_key_id:
215                 return k
216         return None
217
218     def query_vpp_config(self):
219         return self.get_bfd_auth_keys_dump_entry() is not None
220
221     def remove_vpp_config(self):
222         self.test.vapi.bfd_auth_del_key(conf_key_id=self._conf_key_id)
223
224     def object_id(self):
225         return "bfd-auth-key-%s" % self._conf_key_id
226
227
228 class VppBFDUDPSession(VppObject):
229     """ Represents BFD UDP session in VPP """
230
231     def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET,
232                  desired_min_tx=300000, required_min_rx=300000, detect_mult=3,
233                  sha1_key=None, bfd_key_id=None, is_tunnel=False):
234         self._test = test
235         self._interface = interface
236         self._af = af
237         if local_addr:
238             self._local_addr = local_addr
239         else:
240             self._local_addr = None
241         self._peer_addr = peer_addr
242         self._desired_min_tx = desired_min_tx
243         self._required_min_rx = required_min_rx
244         self._detect_mult = detect_mult
245         self._sha1_key = sha1_key
246         if bfd_key_id is not None:
247             self._bfd_key_id = bfd_key_id
248         else:
249             self._bfd_key_id = randint(0, 255)
250         self._is_tunnel = is_tunnel
251
252     @property
253     def test(self):
254         """ Test which created this session """
255         return self._test
256
257     @property
258     def interface(self):
259         """ Interface on which this session lives """
260         return self._interface
261
262     @property
263     def af(self):
264         """ Address family - AF_INET or AF_INET6 """
265         return self._af
266
267     @property
268     def local_addr(self):
269         """ BFD session local address (VPP address) """
270         if self._local_addr is None:
271             if self.af == AF_INET:
272                 return self._interface.local_ip4
273             elif self.af == AF_INET6:
274                 return self._interface.local_ip6
275             else:
276                 raise Exception("Unexpected af '%s'" % self.af)
277         return self._local_addr
278
279     @property
280     def peer_addr(self):
281         """ BFD session peer address """
282         return self._peer_addr
283
284     def get_bfd_udp_session_dump_entry(self):
285         """ get the namedtuple entry from bfd udp session dump """
286         result = self.test.vapi.bfd_udp_session_dump()
287         for s in result:
288             self.test.logger.debug("session entry: %s" % str(s))
289             if s.sw_if_index == self.interface.sw_if_index:
290                 if self.af == AF_INET \
291                         and self.interface.local_ip4 == str(s.local_addr) \
292                         and self.interface.remote_ip4 == str(s.peer_addr):
293                     return s
294                 if self.af == AF_INET6 \
295                         and self.interface.local_ip6 == str(s.local_addr) \
296                         and self.interface.remote_ip6 == str(s.peer_addr):
297                     return s
298         return None
299
300     @property
301     def state(self):
302         """ BFD session state """
303         session = self.get_bfd_udp_session_dump_entry()
304         if session is None:
305             raise Exception("Could not find BFD session in VPP response")
306         return session.state
307
308     @property
309     def desired_min_tx(self):
310         """ desired minimum tx interval """
311         return self._desired_min_tx
312
313     @property
314     def required_min_rx(self):
315         """ required minimum rx interval """
316         return self._required_min_rx
317
318     @property
319     def detect_mult(self):
320         """ detect multiplier """
321         return self._detect_mult
322
323     @property
324     def sha1_key(self):
325         """ sha1 key """
326         return self._sha1_key
327
328     @property
329     def bfd_key_id(self):
330         """ bfd key id in use """
331         return self._bfd_key_id
332
333     @property
334     def is_tunnel(self):
335         return self._is_tunnel
336
337     def activate_auth(self, key, bfd_key_id=None, delayed=False):
338         """ activate authentication for this session """
339         self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255)
340         self._sha1_key = key
341         conf_key_id = self._sha1_key.conf_key_id
342         is_delayed = 1 if delayed else 0
343         self.test.vapi.bfd_udp_auth_activate(
344             sw_if_index=self._interface.sw_if_index,
345             local_addr=self.local_addr,
346             peer_addr=self.peer_addr,
347             bfd_key_id=self._bfd_key_id,
348             conf_key_id=conf_key_id,
349             is_delayed=is_delayed)
350
351     def deactivate_auth(self, delayed=False):
352         """ deactivate authentication """
353         self._bfd_key_id = None
354         self._sha1_key = None
355         is_delayed = 1 if delayed else 0
356         self.test.vapi.bfd_udp_auth_deactivate(
357             sw_if_index=self._interface.sw_if_index,
358             local_addr=self.local_addr,
359             peer_addr=self.peer_addr,
360             is_delayed=is_delayed)
361
362     def modify_parameters(self,
363                           detect_mult=None,
364                           desired_min_tx=None,
365                           required_min_rx=None):
366         """ modify session parameters """
367         if detect_mult:
368             self._detect_mult = detect_mult
369         if desired_min_tx:
370             self._desired_min_tx = desired_min_tx
371         if required_min_rx:
372             self._required_min_rx = required_min_rx
373         self.test.vapi.bfd_udp_mod(sw_if_index=self._interface.sw_if_index,
374                                    desired_min_tx=self.desired_min_tx,
375                                    required_min_rx=self.required_min_rx,
376                                    detect_mult=self.detect_mult,
377                                    local_addr=self.local_addr,
378                                    peer_addr=self.peer_addr)
379
380     def add_vpp_config(self):
381         bfd_key_id = self._bfd_key_id if self._sha1_key else None
382         conf_key_id = self._sha1_key.conf_key_id if self._sha1_key else None
383         is_authenticated = True if self._sha1_key else False
384         self.test.vapi.bfd_udp_add(sw_if_index=self._interface.sw_if_index,
385                                    desired_min_tx=self.desired_min_tx,
386                                    required_min_rx=self.required_min_rx,
387                                    detect_mult=self.detect_mult,
388                                    local_addr=self.local_addr,
389                                    peer_addr=self.peer_addr,
390                                    bfd_key_id=bfd_key_id,
391                                    conf_key_id=conf_key_id,
392                                    is_authenticated=is_authenticated)
393         self._test.registry.register(self, self.test.logger)
394
395     def query_vpp_config(self):
396         session = self.get_bfd_udp_session_dump_entry()
397         return session is not None
398
399     def remove_vpp_config(self):
400         self.test.vapi.bfd_udp_del(self._interface.sw_if_index,
401                                    local_addr=self.local_addr,
402                                    peer_addr=self.peer_addr)
403
404     def object_id(self):
405         return "bfd-udp-%s-%s-%s-%s" % (self._interface.sw_if_index,
406                                         self.local_addr,
407                                         self.peer_addr,
408                                         self.af)
409
410     def admin_up(self):
411         """ set bfd session admin-up """
412         self.test.vapi.bfd_udp_session_set_flags(
413             flags=VppEnum.vl_api_if_status_flags_t.IF_STATUS_API_FLAG_ADMIN_UP,
414             sw_if_index=self._interface.sw_if_index,
415             local_addr=self.local_addr,
416             peer_addr=self.peer_addr)
417
418     def admin_down(self):
419         """ set bfd session admin-down """
420         self.test.vapi.bfd_udp_session_set_flags(
421             flags=0, sw_if_index=self._interface.sw_if_index,
422             local_addr=self.local_addr,
423             peer_addr=self.peer_addr)