1 # Copyright (c) 2019 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
14 """Python API executor library.
21 import struct # vpp-papi can raise struct.error
27 from pprint import pformat
28 from robot.api import logger
30 from resources.libraries.python.Constants import Constants
31 from resources.libraries.python.LocalExecution import run
32 from resources.libraries.python.FilteredLogger import FilteredLogger
33 from resources.libraries.python.PapiHistory import PapiHistory
34 from resources.libraries.python.ssh import (
35 SSH, SSHTimeout, exec_cmd_no_error, scp_node)
36 from resources.libraries.python.topology import Topology, SocketType
37 from resources.libraries.python.VppApiCrc import VppApiCrcChecker
40 __all__ = [u"PapiExecutor", u"PapiSocketExecutor"]
44 """A helper method, to make namedtuple-like object accessible as dict.
46 If the object is namedtuple-like, its _asdict() form is returned,
47 but in the returned object __getitem__ method is wrapped
48 to dictize also any items returned.
49 If the object does not have _asdict, it will be returned without any change.
50 Integer keys still access the object as tuple.
52 A more useful version would be to keep obj mostly as a namedtuple,
53 just add getitem for string keys. Unfortunately, namedtuple inherits
54 from tuple, including its read-only __getitem__ attribute,
55 so we cannot monkey-patch it.
57 TODO: Create a proxy for named tuple to allow that.
59 :param obj: Arbitrary object to dictize.
61 :returns: Dictized object.
62 :rtype: same as obj type or collections.OrderedDict
64 if not hasattr(obj, u"_asdict"):
67 old_get = ret.__getitem__
68 new_get = lambda self, key: dictize(old_get(self, key))
69 ret.__getitem__ = new_get
73 class PapiSocketExecutor:
74 """Methods for executing VPP Python API commands on forwarded socket.
76 The current implementation connects for the duration of resource manager.
77 Delay for accepting connection is 10s, and disconnect is explicit.
78 TODO: Decrease 10s to value that is long enough for creating connection
79 and short enough to not affect performance.
81 The current implementation downloads and parses .api.json files only once
82 and stores a VPPApiClient instance (disconnected) as a class variable.
83 Accessing multiple nodes with different APIs is therefore not supported.
85 The current implementation seems to run into read error occasionally.
86 Not sure if the error is in Python code on Robot side, ssh forwarding,
87 or socket handling at VPP side. Anyway, reconnect after some sleep
88 seems to help, hoping repeated command execution does not lead to surprises.
89 The reconnection is logged at WARN level, so it is prominently shown
90 in log.html, so we can see how frequently it happens.
92 TODO: Support sockets in NFs somehow.
93 TODO: Support handling of retval!=0 without try/except in caller.
95 Note: Use only with "with" statement, e.g.:
98 with PapiSocketExecutor(node) as papi_exec:
99 reply = papi_exec.add(cmd).get_reply(err_msg)
101 This class processes two classes of VPP PAPI methods:
102 1. Simple request / reply: method='request'.
103 2. Dump functions: method='dump'.
105 Note that access to VPP stats over socket is not supported yet.
107 The recommended ways of use are (examples):
109 1. Simple request / reply
111 a. One request with no arguments:
114 with PapiSocketExecutor(node) as papi_exec:
115 reply = papi_exec.add(cmd).get_reply(err_msg)
117 b. Three requests with arguments, the second and the third ones are the same
118 but with different arguments.
120 with PapiSocketExecutor(node) as papi_exec:
121 replies = papi_exec.add(cmd1, **args1).add(cmd2, **args2).\
122 add(cmd2, **args3).get_replies(err_msg)
126 cmd = 'sw_interface_rx_placement_dump'
127 with PapiSocketExecutor(node) as papi_exec:
128 details = papi_exec.add(cmd, sw_if_index=ifc['vpp_sw_index']).\
132 # Class cache for reuse between instances.
134 """Takes long time to create, stores all PAPI functions and types."""
136 """Accesses .api.json files at creation, caching allows deleting them."""
138 def __init__(self, node, remote_vpp_socket=Constants.SOCKSVR_PATH):
139 """Store the given arguments, declare managed variables.
141 :param node: Node to connect to and forward unix domain socket from.
142 :param remote_vpp_socket: Path to remote socket to tunnel to.
144 :type remote_vpp_socket: str
147 self._remote_vpp_socket = remote_vpp_socket
148 # The list of PAPI commands to be executed on the node.
149 self._api_command_list = list()
150 # The following values are set on enter, reset on exit.
151 self._temp_dir = None
152 self._ssh_control_socket = None
153 self._local_vpp_socket = None
154 self.initialize_vpp_instance()
156 def initialize_vpp_instance(self):
157 """Create VPP instance with bindings to API calls, store as class field.
159 No-op if the instance had been stored already.
161 The instance is initialized for unix domain socket access,
162 it has initialized all the bindings, but it is not connected
163 (to a local socket) yet.
165 This method downloads .api.json files from self._node
166 into a temporary directory, deletes them finally.
168 if self.vpp_instance:
170 cls = self.__class__ # Shorthand for setting class fields.
172 tmp_dir = tempfile.mkdtemp(dir=u"/tmp")
174 # Pack, copy and unpack Python part of VPP installation from _node.
175 # TODO: Use rsync or recursive version of ssh.scp_node instead?
177 exec_cmd_no_error(node, [u"rm", u"-rf", u"/tmp/papi.txz"])
178 # Papi python version depends on OS (and time).
179 # Python 2.7 or 3.4, site-packages or dist-packages.
180 installed_papi_glob = u"/usr/lib/python3*/*-packages/vpp_papi"
181 # We need to wrap this command in bash, in order to expand globs,
182 # and as ssh does join, the inner command has to be quoted.
183 inner_cmd = u" ".join([
184 u"tar", u"cJf", u"/tmp/papi.txz", u"--exclude=*.pyc",
185 installed_papi_glob, u"/usr/share/vpp/api"
187 exec_cmd_no_error(node, [u"bash", u"-c", u"'" + inner_cmd + u"'"])
188 scp_node(node, tmp_dir + u"/papi.txz", u"/tmp/papi.txz", get=True)
189 run([u"tar", u"xf", tmp_dir + u"/papi.txz", u"-C", tmp_dir])
190 api_json_directory = tmp_dir + u"/usr/share/vpp/api"
191 # Perform initial checks before .api.json files are gone,
192 # by creating the checker instance.
193 cls.crc_checker = VppApiCrcChecker(api_json_directory)
194 # When present locally, we finally can find the installation path.
195 package_path = glob.glob(tmp_dir + installed_papi_glob)[0]
196 # Package path has to be one level above the vpp_papi directory.
197 package_path = package_path.rsplit(u"/", 1)[0]
198 sys.path.append(package_path)
199 # pylint: disable=import-outside-toplevel, import-error
200 from vpp_papi.vpp_papi import VPPApiClient as vpp_class
201 vpp_class.apidir = api_json_directory
202 # We need to create instance before removing from sys.path.
203 cls.vpp_instance = vpp_class(
204 use_socket=True, server_address=u"TBD", async_thread=False,
205 read_timeout=14, logger=FilteredLogger(logger, u"INFO"))
206 # Cannot use loglevel parameter, robot.api.logger lacks support.
207 # TODO: Stop overriding read_timeout when VPP-1722 is fixed.
209 shutil.rmtree(tmp_dir)
210 if sys.path[-1] == package_path:
214 """Create a tunnel, connect VPP instance.
216 Only at this point a local socket names are created
217 in a temporary directory, because VIRL runs 3 pybots at once,
218 so harcoding local filenames does not work.
221 :rtype: PapiSocketExecutor
223 # Parsing takes longer than connecting, prepare instance before tunnel.
224 vpp_instance = self.vpp_instance
226 self._temp_dir = tempfile.mkdtemp(dir=u"/tmp")
227 self._local_vpp_socket = self._temp_dir + u"/vpp-api.sock"
228 self._ssh_control_socket = self._temp_dir + u"/ssh.sock"
229 ssh_socket = self._ssh_control_socket
230 # Cleanup possibilities.
231 ret_code, _ = run([u"ls", ssh_socket], check=False)
233 # This branch never seems to be hit in CI,
234 # but may be useful when testing manually.
236 [u"ssh", u"-S", ssh_socket, u"-O", u"exit", u"0.0.0.0"],
237 check=False, log=True
239 # TODO: Is any sleep necessary? How to prove if not?
240 run([u"sleep", u"0.1"])
241 run([u"rm", u"-vrf", ssh_socket])
242 # Even if ssh can perhaps reuse this file,
243 # we need to remove it for readiness detection to work correctly.
244 run([u"rm", u"-rvf", self._local_vpp_socket])
245 # On VIRL, the ssh user is not added to "vpp" group,
246 # so we need to change remote socket file access rights.
248 node, u"chmod o+rwx " + self._remote_vpp_socket, sudo=True
250 # We use sleep command. The ssh command will exit in 10 second,
251 # unless a local socket connection is established,
252 # in which case the ssh command will exit only when
253 # the ssh connection is closed again (via control socket).
254 # The log level is to supress "Warning: Permanently added" messages.
256 u"ssh", u"-S", ssh_socket, u"-M",
257 u"-o", u"LogLevel=ERROR", u"-o", u"UserKnownHostsFile=/dev/null",
258 u"-o", u"StrictHostKeyChecking=no",
259 u"-o", u"ExitOnForwardFailure=yes",
260 u"-L", self._local_vpp_socket + u":" + self._remote_vpp_socket,
261 u"-p", str(node[u"port"]), node[u"username"] + u"@" + node[u"host"],
264 priv_key = node.get(u"priv_key")
266 # This is tricky. We need a file to pass the value to ssh command.
267 # And we need ssh command, because paramiko does not support sockets
268 # (neither ssh_socket, nor _remote_vpp_socket).
269 key_file = tempfile.NamedTemporaryFile()
270 key_file.write(priv_key)
271 # Make sure the content is written, but do not close yet.
273 ssh_cmd[1:1] = [u"-i", key_file.name]
274 password = node.get(u"password")
276 # Prepend sshpass command to set password.
277 ssh_cmd[:0] = [u"sshpass", u"-p", password]
278 time_stop = time.time() + 10.0
279 # subprocess.Popen seems to be the best way to run commands
280 # on background. Other ways (shell=True with "&" and ssh with -f)
281 # seem to be too dependent on shell behavior.
282 # In particular, -f does NOT return values for run().
283 subprocess.Popen(ssh_cmd)
284 # Check socket presence on local side.
285 while time.time() < time_stop:
286 # It can take a moment for ssh to create the socket file.
288 [u"ls", u"-l", self._local_vpp_socket], check=False
294 raise RuntimeError(u"Local side socket has not appeared.")
296 # Socket up means the key has been read. Delete file by closing it.
298 # Everything is ready, set the local socket address and connect.
299 vpp_instance.transport.server_address = self._local_vpp_socket
300 # It seems we can get read error even if every preceding check passed.
301 # Single retry seems to help.
304 vpp_instance.connect_sync(u"csit_socket")
305 except (IOError, struct.error) as err:
306 logger.warn(f"Got initial connect error {err!r}")
307 vpp_instance.disconnect()
311 raise RuntimeError(u"Failed to connect to VPP over a socket.")
314 def __exit__(self, exc_type, exc_val, exc_tb):
315 """Disconnect the vpp instance, tear down the SHH tunnel.
317 Also remove the local sockets by deleting the temporary directory.
318 Arguments related to possible exception are entirely ignored.
320 self.vpp_instance.disconnect()
322 u"ssh", u"-S", self._ssh_control_socket, u"-O", u"exit", u"0.0.0.0"
324 shutil.rmtree(self._temp_dir)
326 def add(self, csit_papi_command, history=True, **kwargs):
327 """Add next command to internal command list; return self.
329 Unless disabled, new entry to papi history is also added at this point.
330 The argument name 'csit_papi_command' must be unique enough as it cannot
331 be repeated in kwargs.
332 The kwargs dict is deep-copied, so it is safe to use the original
333 with partial modifications for subsequent commands.
335 Any pending conflicts from .api.json processing are raised.
336 Then the command name is checked for known CRCs.
337 Unsupported commands raise an exception, as CSIT change
338 should not start using messages without making sure which CRCs
340 Each CRC issue is raised only once, so subsequent tests
341 can raise other issues.
343 :param csit_papi_command: VPP API command.
344 :param history: Enable/disable adding command to PAPI command history.
345 :param kwargs: Optional key-value arguments.
346 :type csit_papi_command: str
349 :returns: self, so that method chaining is possible.
350 :rtype: PapiSocketExecutor
351 :raises RuntimeError: If unverified or conflicting CRC is encountered.
353 self.crc_checker.report_initial_conflicts()
355 PapiHistory.add_to_papi_history(
356 self._node, csit_papi_command, **kwargs
358 self.crc_checker.check_api_name(csit_papi_command)
359 self._api_command_list.append(
361 api_name=csit_papi_command,
362 api_args=copy.deepcopy(kwargs)
367 def get_replies(self, err_msg="Failed to get replies."):
368 """Get replies from VPP Python API.
370 The replies are parsed into dict-like objects,
371 "retval" field is guaranteed to be zero on success.
373 :param err_msg: The message used if the PAPI command(s) execution fails.
375 :returns: Responses, dict objects with fields due to API and "retval".
377 :raises RuntimeError: If retval is nonzero, parsing or ssh error.
379 return self._execute(err_msg=err_msg)
381 def get_reply(self, err_msg=u"Failed to get reply."):
382 """Get reply from VPP Python API.
384 The reply is parsed into dict-like object,
385 "retval" field is guaranteed to be zero on success.
387 TODO: Discuss exception types to raise, unify with inner methods.
389 :param err_msg: The message used if the PAPI command(s) execution fails.
391 :returns: Response, dict object with fields due to API and "retval".
393 :raises AssertionError: If retval is nonzero, parsing or ssh error.
395 replies = self.get_replies(err_msg=err_msg)
396 if len(replies) != 1:
397 raise RuntimeError(f"Expected single reply, got {replies!r}")
400 def get_sw_if_index(self, err_msg=u"Failed to get reply."):
401 """Get sw_if_index from reply from VPP Python API.
403 Frequently, the caller is only interested in sw_if_index field
404 of the reply, this wrapper makes such call sites shorter.
406 TODO: Discuss exception types to raise, unify with inner methods.
408 :param err_msg: The message used if the PAPI command(s) execution fails.
410 :returns: Response, sw_if_index value of the reply.
412 :raises AssertionError: If retval is nonzero, parsing or ssh error.
414 reply = self.get_reply(err_msg=err_msg)
415 logger.trace(f"Getting index from {reply!r}")
416 return reply[u"sw_if_index"]
418 def get_details(self, err_msg="Failed to get dump details."):
419 """Get dump details from VPP Python API.
421 The details are parsed into dict-like objects.
422 The number of details per single dump command can vary,
423 and all association between details and dumps is lost,
424 so if you care about the association (as opposed to
425 logging everything at once for debugging purposes),
426 it is recommended to call get_details for each dump (type) separately.
428 :param err_msg: The message used if the PAPI command(s) execution fails.
430 :returns: Details, dict objects with fields due to API without "retval".
433 return self._execute(err_msg)
437 node, cli_cmd, log=True, remote_vpp_socket=Constants.SOCKSVR_PATH):
438 """Run a CLI command as cli_inband, return the "reply" field of reply.
440 Optionally, log the field value.
442 :param node: Node to run command on.
443 :param cli_cmd: The CLI command to be run on the node.
444 :param remote_vpp_socket: Path to remote socket to tunnel to.
445 :param log: If True, the response is logged.
447 :type remote_vpp_socket: str
450 :returns: CLI output.
457 err_msg = f"Failed to run 'cli_inband {cli_cmd}' PAPI command " \
458 f"on host {node[u'host']}"
460 with PapiSocketExecutor(node, remote_vpp_socket) as papi_exec:
461 reply = papi_exec.add(cmd, **args).get_reply(err_msg)["reply"]
464 f"{cmd} ({node[u'host']} - {remote_vpp_socket}):\n"
470 def run_cli_cmd_on_all_sockets(node, cli_cmd, log=True):
471 """Run a CLI command as cli_inband, on all sockets in topology file.
473 :param node: Node to run command on.
474 :param cli_cmd: The CLI command to be run on the node.
475 :param log: If True, the response is logged.
480 sockets = Topology.get_node_sockets(node, socket_type=SocketType.PAPI)
482 for socket in sockets.values():
483 PapiSocketExecutor.run_cli_cmd(
484 node, cli_cmd, log=log, remote_vpp_socket=socket
488 def dump_and_log(node, cmds):
489 """Dump and log requested information, return None.
491 :param node: DUT node.
492 :param cmds: Dump commands to be executed.
494 :type cmds: list of str
496 with PapiSocketExecutor(node) as papi_exec:
498 dump = papi_exec.add(cmd).get_details()
499 logger.debug(f"{cmd}:\n{pformat(dump)}")
501 def _execute(self, err_msg=u"Undefined error message", exp_rv=0):
502 """Turn internal command list into data and execute; return replies.
504 This method also clears the internal command list.
507 Do not use this method in L1 keywords. Use:
513 :param err_msg: The message used if the PAPI command(s) execution fails.
515 :returns: Papi responses parsed into a dict-like object,
516 with fields due to API (possibly including retval).
518 :raises RuntimeError: If the replies are not all correct.
520 vpp_instance = self.vpp_instance
521 local_list = self._api_command_list
522 # Clear first as execution may fail.
523 self._api_command_list = list()
525 for command in local_list:
526 api_name = command[u"api_name"]
527 papi_fn = getattr(vpp_instance.api, api_name)
530 reply = papi_fn(**command[u"api_args"])
531 except (IOError, struct.error) as err:
532 # Occasionally an error happens, try reconnect.
533 logger.warn(f"Reconnect after error: {err!r}")
534 self.vpp_instance.disconnect()
535 # Testing shows immediate reconnect fails.
537 self.vpp_instance.connect_sync(u"csit_socket")
538 logger.trace(u"Reconnected.")
539 reply = papi_fn(**command[u"api_args"])
540 except (AttributeError, IOError, struct.error) as err:
541 raise AssertionError(err_msg) from err
542 # *_dump commands return list of objects, convert, ordinary reply.
543 if not isinstance(reply, list):
546 self.crc_checker.check_api_name(item.__class__.__name__)
547 dict_item = dictize(item)
548 if u"retval" in dict_item.keys():
549 # *_details messages do not contain retval.
550 retval = dict_item[u"retval"]
552 # TODO: What exactly to log and raise here?
553 raise AssertionError(
554 f"Retval {retval!r} does not match expected "
557 replies.append(dict_item)
562 """Contains methods for executing VPP Python API commands on DUTs.
564 TODO: Remove .add step, make get_stats accept paths directly.
566 This class processes only one type of VPP PAPI methods: vpp-stats.
568 The recommended ways of use are (examples):
570 path = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
571 with PapiExecutor(node) as papi_exec:
572 stats = papi_exec.add(api_name='vpp-stats', path=path).get_stats()
574 print('RX interface core 0, sw_if_index 0:\n{0}'.\
575 format(stats[0]['/if/rx'][0][0]))
580 path_2 = ['^/if', '/err/ip4-input', '/sys/node/ip4-input']
581 with PapiExecutor(node) as papi_exec:
582 stats = papi_exec.add('vpp-stats', path=path_1).\
583 add('vpp-stats', path=path_2).get_stats()
585 print('RX interface core 0, sw_if_index 0:\n{0}'.\
586 format(stats[1]['/if/rx'][0][0]))
588 Note: In this case, when PapiExecutor method 'add' is used:
589 - its parameter 'csit_papi_command' is used only to keep information
590 that vpp-stats are requested. It is not further processed but it is
591 included in the PAPI history this way:
592 vpp-stats(path=['^/if', '/err/ip4-input', '/sys/node/ip4-input'])
593 Always use csit_papi_command="vpp-stats" if the VPP PAPI method
595 - the second parameter must be 'path' as it is used by PapiExecutor
599 def __init__(self, node):
602 :param node: Node to run command(s) on.
605 # Node to run command(s) on.
608 # The list of PAPI commands to be executed on the node.
609 self._api_command_list = list()
615 self._ssh.connect(self._node)
618 f"Cannot open SSH connection to host {self._node[u'host']} "
619 f"to execute PAPI command(s)"
623 def __exit__(self, exc_type, exc_val, exc_tb):
624 self._ssh.disconnect(self._node)
626 def add(self, csit_papi_command=u"vpp-stats", history=True, **kwargs):
627 """Add next command to internal command list; return self.
629 The argument name 'csit_papi_command' must be unique enough as it cannot
630 be repeated in kwargs.
631 The kwargs dict is deep-copied, so it is safe to use the original
632 with partial modifications for subsequent commands.
634 :param csit_papi_command: VPP API command.
635 :param history: Enable/disable adding command to PAPI command history.
636 :param kwargs: Optional key-value arguments.
637 :type csit_papi_command: str
640 :returns: self, so that method chaining is possible.
644 PapiHistory.add_to_papi_history(
645 self._node, csit_papi_command, **kwargs
647 self._api_command_list.append(
649 api_name=csit_papi_command, api_args=copy.deepcopy(kwargs)
655 self, err_msg=u"Failed to get statistics.", timeout=120,
656 socket=Constants.SOCKSTAT_PATH):
657 """Get VPP Stats from VPP Python API.
659 :param err_msg: The message used if the PAPI command(s) execution fails.
660 :param timeout: Timeout in seconds.
661 :param socket: Path to Stats socket to tunnel to.
665 :returns: Requested VPP statistics.
668 paths = [cmd[u"api_args"][u"path"] for cmd in self._api_command_list]
669 self._api_command_list = list()
671 stdout = self._execute_papi(
672 paths, method=u"stats", err_msg=err_msg, timeout=timeout,
676 return json.loads(stdout)
679 def _process_api_data(api_d):
680 """Process API data for smooth converting to JSON string.
682 Apply binascii.hexlify() method for string values.
684 :param api_d: List of APIs with their arguments.
686 :returns: List of APIs with arguments pre-processed for JSON.
690 def process_value(val):
693 :param val: Value to be processed.
695 :returns: Processed value.
696 :rtype: dict or str or int
698 if isinstance(val, dict):
699 for val_k, val_v in val.items():
700 val[str(val_k)] = process_value(val_v)
702 elif isinstance(val, list):
703 for idx, val_l in enumerate(val):
704 val[idx] = process_value(val_l)
707 retval = val.encode().hex() if isinstance(val, str) else val
710 api_data_processed = list()
712 api_args_processed = dict()
713 for a_k, a_v in api[u"api_args"].items():
714 api_args_processed[str(a_k)] = process_value(a_v)
715 api_data_processed.append(
717 api_name=api[u"api_name"],
718 api_args=api_args_processed
721 return api_data_processed
724 self, api_data, method=u"request", err_msg=u"", timeout=120,
726 """Execute PAPI command(s) on remote node and store the result.
728 :param api_data: List of APIs with their arguments.
729 :param method: VPP Python API method. Supported methods are: 'request',
731 :param err_msg: The message used if the PAPI command(s) execution fails.
732 :param timeout: Timeout in seconds.
737 :returns: Stdout from remote python utility, to be parsed by caller.
739 :raises SSHTimeout: If PAPI command(s) execution has timed out.
740 :raises RuntimeError: If PAPI executor failed due to another reason.
741 :raises AssertionError: If PAPI command(s) execution has failed.
744 raise RuntimeError(u"No API data provided.")
746 json_data = json.dumps(api_data) \
747 if method in (u"stats", u"stats_request") \
748 else json.dumps(self._process_api_data(api_data))
750 sock = f" --socket {socket}" if socket else u""
751 cmd = f"{Constants.REMOTE_FW_DIR}/{Constants.RESOURCES_PAPI_PROVIDER}" \
752 f" --method {method} --data '{json_data}'{sock}"
754 ret_code, stdout, _ = self._ssh.exec_command_sudo(
755 cmd=cmd, timeout=timeout, log_stdout_err=False
757 # TODO: Fail on non-empty stderr?
760 f"PAPI command(s) execution timeout on host "
761 f"{self._node[u'host']}:\n{api_data}"
764 except Exception as exc:
766 f"PAPI command(s) execution on host {self._node[u'host']} "
767 f"failed: {api_data}"
770 raise AssertionError(err_msg)