Source code for msl.network.utils

"""
Common functions used by MSL-Network.
"""
import ast
import logging
import os
import re

from .constants import DISCONNECT_REQUEST

_args_regex = re.compile(r'\s*([^\"\s]+|\"[^\"]*\")')

_kwargs_regex = re.compile(r'(\w+)\s*=\s*([^\"\s]+|\"[^\"]*\")')

_is_manager_regex = re.compile(r'^Manager\[\S+:\d+]$')

_numeric_address_regex = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')

_oid_regex = re.compile(r'oid=(.+), name=(.+)\)')

_is_username_invalid_regex = re.compile(r':\d+$')

logger = logging.getLogger(__package__)


[docs]def ensure_root_path(path): """Ensure that the root directory of the file path exists. Parameters ---------- path : :class:`str` A file path. For example, if `path` is ``/the/path/to/my/test/file.txt`` then this function would ensure that the ``/the/path/to/my/test`` directories exist (creating the intermediate directories if necessary). """ if path is not None: root = os.path.dirname(path) if root and not os.path.isdir(root): os.makedirs(root)
[docs]def parse_terminal_input(line): """Parse text from a terminal connection. See, :ref:`terminal-input` for more details. .. _JSON: https://www.json.org/ Parameters ---------- line : :class:`str` The input text from the terminal. Returns ------- :class:`dict` The JSON_ object. """ def convert_value(val): val_ = val.lower() if val_ == 'false': return False elif val_ == 'true': return True elif val_ == 'null' or val_ == 'none': return None else: try: return ast.literal_eval(val) except: # noqa return val line = line.strip() line_lower = line.lower() if line_lower == 'identity': return { 'service': 'Manager', 'attribute': 'identity', 'args': [], 'kwargs': {}, 'uid': '', 'error': False, } elif line_lower.startswith('client'): values = line.split() # can also specify a name for the Client, e.g., client Me and Myself name = 'Client' if len(values) == 1 else ' '.join(values[1:]) return { 'type': 'client', 'name': name.replace('"', ''), 'language': 'unknown', 'os': 'unknown', 'error': False, } elif line_lower == DISCONNECT_REQUEST or line_lower == 'disconnect' or line_lower == 'exit': return { 'service': 'self', 'attribute': DISCONNECT_REQUEST, 'args': [], 'kwargs': {}, 'uid': '', 'error': False, } elif line_lower.startswith('link'): return { 'service': 'Manager', 'attribute': 'link', 'args': [line[4:].strip().replace('"', '')], 'kwargs': {}, 'uid': '', 'error': False, } else: line = line.replace("'", '"') if line.startswith('"'): line = line.split('"', maxsplit=2) items = [item.strip() for item in line if item.strip()] if len(items) > 1: items = [items[0]] + items[1].split(None, maxsplit=1) else: items = line.split(None, maxsplit=2) if len(items) < 2: # then the name of the service and/or attribute was not set return None service = items[0].replace('"', '') attribute = items[1].replace('"', '') if len(items) == 2: # no parameters return { 'service': service, 'attribute': attribute, 'args': [], 'kwargs': {}, 'uid': '', 'error': False, } else: args = [convert_value(m.groups()[0]) for m in re.finditer(_args_regex, items[2])] kwargs = dict() for i, m in enumerate(re.finditer(_kwargs_regex, items[2])): key, value = m.groups() if i == 0: args = [convert_value(m.groups()[0]) for m in re.finditer(_args_regex, items[2].split(key)[0])] kwargs[key] = convert_value(value) return { 'service': service, 'attribute': attribute, 'args': args, 'kwargs': kwargs, 'uid': '', 'error': False, }