Source code for msl.network.database

"""
Databases that are used by the Network :class:`~msl.network.manager.Manager`.
"""
import os
import sqlite3
from datetime import datetime

from cryptography.exceptions import InvalidKey
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

from .constants import DATABASE
from .constants import LOCALHOST_ALIASES
from .utils import _is_username_invalid_regex
from .utils import logger


[docs] class Database(object): def __init__(self, database, **kwargs): """Base class for connecting to a SQLite database. Automatically creates the database if it does not already exist. Parameters ---------- database : :class:`str` The path to the database file, or ``':memory:'`` to open a connection to a database that resides in RAM instead of on disk. kwargs Optional keyword arguments to pass to :func:`sqlite3.connect`. """ self._path = database if database is not None else DATABASE self._connection = None # open the connection to the database if self._path == ':memory:': logger.debug('creating a database in RAM') elif not os.path.isfile(self._path): logger.debug('creating a new database %s', self._path) else: logger.debug('opening %s', self._path) kwargs.setdefault('timeout', 60.0) self._connection = sqlite3.connect(self._path, **kwargs) self._cursor = self._connection.cursor() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() @property def path(self): """:class:`str`: The path to the database file.""" return self._path @property def connection(self): """:class:`sqlite3.Connection`: The connection object.""" return self._connection @property def cursor(self): """:class:`sqlite3.Cursor`: The cursor object.""" return self._cursor def __del__(self): self.close()
[docs] def close(self): """Closes the connection to the database.""" if self._connection is not None: self._connection.close() self._connection = None try: logger.debug('closed %s', self._path) except (NameError, ValueError): # These errors could occur when Python is exiting # ValueError: I/O operation on closed file # NameError: name 'open' is not defined pass
[docs] def execute(self, sql, parameters=None): """Wrapper around :meth:`sqlite3.Cursor.execute`. Parameters ---------- sql : :class:`str` The SQL command to execute parameters : :class:`list`, :class:`tuple` or :class:`dict`, optional Only required if the `sql` command is parameterized. """ if parameters is None: self._cursor.execute(sql) else: self._cursor.execute(sql, parameters)
[docs] def tables(self): """:class:`list` of :class:`str`: A list of the names of each table that is in the database.""" self.execute("SELECT name FROM sqlite_master WHERE type='table';") return sorted([t[0] for t in self._cursor.fetchall() if t[0] != 'sqlite_sequence'])
[docs] def table_info(self, name): """Returns the information about each column in the specified table. Parameters ---------- name : :class:`str` The name of the table to get the information of. Returns ------- :class:`list` of :class:`tuple` The list of the fields in the table. The indices of each tuple correspond to: * 0 - id number of the column * 1 - the name of the column * 2 - the datatype of the column * 3 - whether a value in the column can be NULL (0 or 1) * 4 - the default value for the column * 5 - whether the column is used as a primary key (0 or 1) """ self.execute(f'PRAGMA table_info({name!r});') return self._cursor.fetchall()
[docs] def column_names(self, table_name): """Returns the names of the columns in the specified table. Parameters ---------- table_name : :class:`str` The name of the table. Returns ------- :class:`list` of :class:`str` A list of the names of each column in the table. """ return [item[1] for item in self.table_info(table_name)]
[docs] def column_datatypes(self, table_name): """Returns the datatype of each column in the specified table. Parameters ---------- table_name : :class:`str` The name of the table. Returns ------- :class:`list` of :class:`str` A list of the datatypes of each column in the table. """ return [item[2] for item in self.table_info(table_name)]
[docs] class ConnectionsTable(Database): NAME = 'connections' """:class:`str`: The name of the table in the database.""" def __init__(self, *, database=None, as_datetime=False, **kwargs): """The database table for devices that have connected to the Network :class:`~msl.network.manager.Manager`. Parameters ---------- database : :class:`str`, optional The path to the database file, or ``':memory:'`` to open a connection to a database that resides in RAM instead of on disk. If :data:`None` then loads the default database. as_datetime : :class:`bool`, optional Whether to fetch the timestamps from the database as :class:`datetime.datetime` objects. If :data:`False` then the timestamps will be of type :class:`str`. kwargs Optional keyword arguments to pass to :func:`sqlite3.connect`. """ if as_datetime and 'detect_types' not in kwargs: kwargs['detect_types'] = sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES super(ConnectionsTable, self).__init__(database, **kwargs) self.execute(f'CREATE TABLE IF NOT EXISTS {self.NAME} (' f'pid INTEGER PRIMARY KEY AUTOINCREMENT, ' f'datetime DATETIME NOT NULL, ' f'ip_address TEXT NOT NULL, ' f'domain TEXT NOT NULL, ' f'port INTEGER NOT NULL, ' f'message TEXT NOT NULL);') self.connection.commit()
[docs] def insert(self, peer, message): """Insert a message about what happened when a device connected. Parameters ---------- peer : :class:`~msl.network.manager.Peer` The peer that connected to the Network :class:`~msl.network.manager.Manager`. message : :class:`str` The message about what happened (e.g, the connection was successful, or it failed). """ now = datetime.now().replace(microsecond=0).isoformat(sep='T') self.execute(f'INSERT INTO {self.NAME} VALUES(NULL, ?, ?, ?, ?, ?);', (now, peer.ip_address, peer.domain, peer.port, message)) self.connection.commit()
[docs] def connections(self, *, start=None, end=None): """Return the information of the devices that have connected to the Network :class:`~msl.network.manager.Manager`. .. versionchanged:: 1.0 Use ``T`` as the separator between the date and time. Renamed `timestamp1` to `start`. Renamed `timestamp2` to `end`. Parameters ---------- start : :class:`datetime.datetime` or :class:`str`, optional Include all records that have a timestamp :math:`\\ge` `start`. If a :class:`str` then in the ``yyyy-mm-dd`` or ``yyyy-mm-ddTHH:MM:SS`` format. end : :class:`datetime.datetime` or :class:`str`, optional Include all records that have a timestamp :math:`\\le` `end`. If a :class:`str` then in the ``yyyy-mm-dd`` or ``yyyy-mm-ddTHH:MM:SS`` format. Returns ------- :class:`list` of :class:`tuple` The connection records. """ pre = f'SELECT * FROM {self.NAME}' if start is None and end is None: self.execute(f'{pre};') elif start is not None and end is None: self.execute(f'{pre} WHERE timestamp >= ?;', (start,)) elif start is None and end is not None: self.execute(f'{self.NAME} WHERE timestamp <= ?;', (end,)) else: self.execute(f'{pre} WHERE timestamp >= ? AND timestamp <= ?;', (start, end)) return self.cursor.fetchall()
[docs] class HostnamesTable(Database): NAME = 'auth_hostnames' """:class:`str`: The name of the table in the database.""" def __init__(self, *, database=None, **kwargs): """The database table for trusted hostname's that are allowed to connect to the Network :class:`~msl.network.manager.Manager`. Parameters ---------- database : :class:`str`, optional The path to the database file, or ``':memory:'`` to open a connection to a database that resides in RAM instead of on disk. If :data:`None` then loads the default database. kwargs Optional keyword arguments to pass to :func:`sqlite3.connect`. """ super(HostnamesTable, self).__init__(database, **kwargs) self.execute(f'CREATE TABLE IF NOT EXISTS {self.NAME} ' f'(hostname TEXT NOT NULL, UNIQUE(hostname));') self.connection.commit() if not self.hostnames(): for hostname in LOCALHOST_ALIASES: self.insert(hostname)
[docs] def insert(self, hostname): """Insert a hostname. If the hostname is already in the table then it does not insert it again. Parameters ---------- hostname : :class:`str` The trusted hostname. """ self.execute(f'INSERT OR IGNORE INTO {self.NAME} VALUES(?);', (hostname,)) self.connection.commit()
[docs] def delete(self, hostname): """Delete a hostname. Parameters ---------- hostname : :class:`str` A hostname in the table. Raises ------ ValueError If `hostname` is not in the table. """ # want to know if this hostname is not in the table if hostname not in self.hostnames(): raise ValueError(f'Cannot delete {hostname!r}. This hostname is not in the table.') self.execute(f'DELETE FROM {self.NAME} WHERE hostname = ?;', (hostname,)) self.connection.commit()
[docs] def hostnames(self): """:class:`list` of :class:`str`: Returns all the trusted hostnames.""" self.execute(f'SELECT * FROM {self.NAME};') return sorted([item[0] for item in self.cursor.fetchall()])
[docs] class UsersTable(Database): NAME = 'auth_users' """:class:`str`: The name of the table in the database.""" def __init__(self, *, database=None, **kwargs): """The database table for keeping information about a users login credentials for connecting to a Network :class:`~msl.network.manager.Manager`. Parameters ---------- database : :class:`str`, optional The path to the database file, or ``':memory:'`` to open a connection to a database that resides in RAM instead of on disk. If :data:`None` then loads the default database. kwargs Optional keyword arguments to pass to :func:`sqlite3.connect`. """ super(UsersTable, self).__init__(database, **kwargs) self.execute(f'CREATE TABLE IF NOT EXISTS {self.NAME} (' f'pid INTEGER PRIMARY KEY AUTOINCREMENT, ' f'username TEXT NOT NULL, ' f'key BLOB NOT NULL, ' f'salt BLOB NOT NULL, ' f'is_admin BOOLEAN NOT NULL, ' f'UNIQUE(username));') self.connection.commit() self._salt_size = 16 self._length = 32 self._iterations = 100000 self._algorithm = hashes.SHA256()
[docs] def insert(self, username, password, is_admin): """Insert a new user. The password is encrypted and stored in the database using PBKDF2_ .. _PBKDF2: https://en.wikipedia.org/wiki/PBKDF2 To update the values for a user use :meth:`update`. Parameters ---------- username : :class:`str` The name of the user. password : :class:`str` The password of the user in plain-text format. is_admin : :class:`bool` Does this user have admin rights? Raises ------- ValueError If the `username` is invalid or if `password` is empty. """ if _is_username_invalid_regex.search(username) is not None: raise ValueError('A username cannot end with ":<integer>"') if not password: raise ValueError(f'You must specify a password for {username!r}') salt = os.urandom(self._salt_size) kdf = PBKDF2HMAC( algorithm=self._algorithm, length=self._length, salt=salt, iterations=self._iterations, ) key = kdf.derive(password.encode()) try: self.execute(f'INSERT INTO {self.NAME} VALUES(NULL, ?, ?, ?, ?);', (username, key, salt, bool(is_admin))) except sqlite3.IntegrityError: raise ValueError(f'A user with the name {username!r} already exists') from None self.connection.commit()
[docs] def update(self, username, *, password=None, is_admin=None): """Update either the salt used for the password and/or the admin rights. Parameters ---------- username : :class:`str` The name of the user. password : :class:`str`, optional The password of the user in plain-text format. is_admin : :class:`bool`, optional Does this user have admin rights? Raises ------ ValueError If `username` is not in the table. If both `password` and `is_admin` are not specified. If `password` is an empty string. """ self._ensure_user_exists(username, 'update') if password is None and is_admin is None: raise ValueError('Must specify either the password and/or the admin rights when updating') if password is None: self.execute(f'UPDATE {self.NAME} SET is_admin=? WHERE username=?;', (bool(is_admin), username)) self.connection.commit() return if not password: raise ValueError(f'You must specify a password for {username!r}') salt = os.urandom(self._salt_size) key = PBKDF2HMAC( algorithm=self._algorithm, length=self._length, salt=salt, iterations=self._iterations, ).derive(password.encode()) if is_admin is None: self.execute(f'UPDATE {self.NAME} SET key=?, salt=? WHERE username=?;', (key, salt, username)) else: self.execute(f'UPDATE {self.NAME} SET key=?, salt=?, is_admin=? WHERE username=?;', (key, salt, bool(is_admin), username)) self.connection.commit()
[docs] def delete(self, username): """Delete a user. Parameters ---------- username : :class:`str` The name of the user. Raises ------ ValueError If `username` is not in the table. """ self._ensure_user_exists(username, 'delete') self.execute(f'DELETE FROM {self.NAME} WHERE username = ?;', (username,)) self.connection.commit()
[docs] def get_user(self, username): """Get the information about a user. Parameters ---------- username : :class:`str` The name of the user. Returns ------- :class:`tuple` Returns (pid, username, key, salt, is_admin) for the specified `username`. """ self.execute(f'SELECT * FROM {self.NAME} WHERE username = ?;', (username,)) return self.cursor.fetchone()
[docs] def records(self): """:class:`list` of :class:`tuple`: Returns [(pid, username, key, salt, is_admin), ...] for all users.""" self.execute(f'SELECT * FROM {self.NAME};') return self.cursor.fetchall()
[docs] def usernames(self): """:class:`list` of :class:`str`: Returns the names of all registered users.""" self.execute(f'SELECT username FROM {self.NAME};') return [item[0] for item in self.cursor.fetchall()]
[docs] def users(self): """:class:`list` of :class:`tuple`: Returns [(username, is_admin), ... ] for all users.""" self.execute(f'SELECT username,is_admin FROM {self.NAME};') return sorted([(item[0], bool(item[1])) for item in self.cursor.fetchall()])
[docs] def is_user_registered(self, username): """:class:`bool`: Whether `username` is a registered user.""" self.execute(f'SELECT count(*) FROM {self.NAME} WHERE username = ?;', (username,)) return bool(self.cursor.fetchone()[0])
[docs] def is_password_valid(self, username, password): """Check whether the password matches the encrypted password in the database. Parameters ---------- username : :class:`str` The name of the user. password : :class:`str` The password to check (in plain-text format). Returns ------- :class:`bool` Whether `password` matches the password in the database for the user. """ self.execute(f'SELECT key,salt FROM {self.NAME} WHERE username = ?;', (username,)) key_salt = self._cursor.fetchone() if not key_salt: return False kdf = PBKDF2HMAC( algorithm=self._algorithm, length=self._length, salt=key_salt[1], iterations=self._iterations, ) try: kdf.verify(password.encode(), key_salt[0]) return True except InvalidKey: return False
[docs] def is_admin(self, username): """Check whether a user has admin rights. Parameters ---------- username : :class:`str` The name of the user. Returns ------- :class:`bool` Whether the user has admin rights. """ self.execute(f'SELECT is_admin FROM {self.NAME} WHERE username = ?;', (username,)) user = self.cursor.fetchone() if user: return bool(user[0]) return False
def _ensure_user_exists(self, username, action): # want to know if this user is not in the table if username not in self.usernames(): raise ValueError( f'Cannot {action} {username!r}. ' f'This user is not in the table.' )
[docs] def convert_datetime(value): """Convert a date and time to a :class:`~datetime.datetime` object. Parameters ---------- value : :class:`bytes` The datetime value from an SQLite database. Returns ------- :class:`datetime.datetime` The `value` as a datetime object. """ try: # datetime.fromisoformat is available in Python 3.7+ return datetime.fromisoformat(value.decode()) except AttributeError: # mimics the sqlite3.dbapi2.convert_timestamp function datepart, timepart = value[:10], value[11:] year, month, day = map(int, datepart.split(b'-')) timepart_full = timepart.split(b'.') hours, minutes, seconds = map(int, timepart_full[0].split(b':')) if len(timepart_full) == 2: microseconds = int(f'{timepart_full[1].decode():0<6.6}') else: microseconds = 0 return datetime(year, month, day, hours, minutes, seconds, microseconds)
# Do not use the builtin TIMESTAMP converter since it does not support # the T separator between the date and time. Also, according to # https://www.sqlite.org/lang_datefunc.html the name DATETIME seems # to be more logical than TIMESTAMP as a field name. sqlite3.register_converter('DATETIME', convert_datetime)