From 2811062c2bb59f86901b56548da21432c00db2f9 Mon Sep 17 00:00:00 2001 From: Markus Frei Date: Mon, 25 May 2026 10:04:29 +0200 Subject: [PATCH] refactor(plugins): unify nextcloud/sqlite/gpg_key/ipa_diff + safe fixes + tests Bring the remaining in-house plugins to the standard style: standard file header, single quotes, f-strings (replacing the last .format() calls in sqlite_query and gpg_key), modern ansible.module_utils.common.text.converters instead of the deprecated _text, fixed import ordering, and removal of leftover boilerplate / commented-out debug code. ipa_diff gains the standard header it lacked. Security: - gpg_key no longer passes input_data (which contains the cleartext passphrase) into fail_json on a failed key generation. Safe fixes: - sqlite_query: REGEXP no longer raises on NULL column values (returns no-match); bare 'except:' narrowed to 'except Exception:'; mutable default argument replaced with None. Add unit tests: ipa_diff (pure diff helpers), sqlite_query (connect / select / regexp / close against a real temp DB) and gpg_key (match_key). Deferred (behaviour-changing, separate PRs): sqlite_query reporting a failed query as a successful run, and nextcloud_occ_app_config array idempotency. --- CHANGELOG.md | 2 + plugins/module_utils/ipa_diff.py | 24 +++-- plugins/modules/gpg_key.py | 35 +++---- plugins/modules/nextcloud_occ_app.py | 12 ++- plugins/modules/nextcloud_occ_app_config.py | 18 ++-- .../modules/nextcloud_occ_system_config.py | 18 ++-- plugins/modules/sqlite_query.py | 37 +++++--- .../plugins/module_utils/test_ipa_diff.py | 93 ++++++++++++++++++ tests/unit/plugins/modules/test_gpg_key.py | 78 +++++++++++++++ .../unit/plugins/modules/test_sqlite_query.py | 95 +++++++++++++++++++ 10 files changed, 344 insertions(+), 68 deletions(-) create mode 100644 tests/unit/plugins/module_utils/test_ipa_diff.py create mode 100644 tests/unit/plugins/modules/test_gpg_key.py create mode 100644 tests/unit/plugins/modules/test_sqlite_query.py diff --git a/CHANGELOG.md b/CHANGELOG.md index e38a2b1b..d8f1fad3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +* **plugin:gpg_key**: The cleartext passphrase is no longer included in the module's failure output when key generation fails. * **role:repo_\***: HTTP basic auth credentials are now only written to the repository config files when a custom mirror URL is set. Previously, setting `lfops__repo_basic_auth_login` without `lfops__repo_mirror_url` wrote the credentials into repo files that still pointed at the public vendor mirrors, causing the package manager to send them to servers that do not use basic auth. The Icinga repo is intentionally unchanged, since its subscription URL legitimately requires basic auth. * **ci**: Scope `GITHUB_TOKEN` permissions in the dependabot-auto-merge workflow to the job level, with top-level now `read-all`. Matches the pattern used by the other LFOps workflows and addresses the OpenSSF Scorecard `Token-Permissions` finding. @@ -42,6 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +* **plugin:sqlite_query**: A `REGEXP` query against a column that contains NULL values no longer fails; a NULL value simply does not match. * **plugin:uptimerobot_\***: The modules no longer crash when the UptimeRobot API returns a non-list response for a list endpoint; the response is passed through instead. * **plugin:nextcloud_occ_app_config, plugin:nextcloud_occ_system_config, plugin:uptimerobot_monitor, plugin:uptimerobot_psp**: Fixed their documentation so `ansible-doc` renders them again. A unit-test guard now catches this class of error for every in-house plugin. * **plugin:bitwarden_item**: Fixed the lookup's documentation so `ansible-doc` renders it again. diff --git a/plugins/module_utils/ipa_diff.py b/plugins/module_utils/ipa_diff.py index 2f308bdb..7f25fd54 100644 --- a/plugins/module_utils/ipa_diff.py +++ b/plugins/module_utils/ipa_diff.py @@ -1,12 +1,20 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + # Temporary diff helpers for ansible-freeipa modules. # Remove once https://github.com/freeipa/ansible-freeipa/pull/1415 # is merged and released. -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import, division, print_function __metaclass__ = type -from ansible.module_utils._text import to_text +from ansible.module_utils.common.text.converters import to_text def _compare_key(arg, ipa_arg): @@ -28,7 +36,7 @@ def _compare_key(arg, ipa_arg): return arg == ipa_arg -class IPADiffTracker(object): +class IPADiffTracker: """Track before/after state for Ansible --diff output.""" def __init__(self): @@ -38,17 +46,17 @@ def build_diff(self): """Return kwargs for exit_json (empty dict if no changes).""" if not self._diffs: return {} - return {"diff": self._diffs} + return {'diff': self._diffs} def add_entry_diff(self, name, before, after): """Record a diff entry for one IPA object.""" if before == after: return self._diffs.append({ - "before_header": name, - "after_header": name, - "before": before, - "after": after, + 'before_header': name, + 'after_header': name, + 'before': before, + 'after': after, }) diff --git a/plugins/modules/gpg_key.py b/plugins/modules/gpg_key.py index ec507305..3a1f3c9d 100644 --- a/plugins/modules/gpg_key.py +++ b/plugins/modules/gpg_key.py @@ -1,10 +1,12 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. -# Copyright: (c) 2022, Linuxfabrik GmbH, Zurich, Switzerland, https://www.linuxfabrik.ch -# The Unlicense (see LICENSE or https://unlicense.org/) - -from __future__ import (absolute_import, division, print_function) +from __future__ import absolute_import, division, print_function __metaclass__ = type @@ -244,14 +246,14 @@ import re import traceback -logger = logging.getLogger('gnupg') -logger.setLevel(logging.DEBUG) - from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils._text import to_native +from ansible.module_utils.common.text.converters import to_native from ansible_collections.linuxfabrik.lfops.plugins.module_utils.gnupg import GPG +logger = logging.getLogger('gnupg') +logger.setLevel(logging.DEBUG) + # taken from https://www.iana.org/assignments/pgp-parameters/pgp-parameters.xhtml#pgp-parameters-12 algo_ids = { 1: 'RSA', @@ -362,12 +364,6 @@ def run_module(): supports_check_mode=True ) - # if debug - # console_logger = logging.StreamHandler() - # console_logger.setLevel(logging.DEBUG) - # console_logger.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) - # logger.addHandler(console_logger) - gnupghome = module.params['gnupghome'] if gnupghome and not os.path.isdir(gnupghome): @@ -384,7 +380,7 @@ def run_module(): gnupghome=gnupghome, ) except (OSError, ValueError) as e: - module.fail_json(msg='There was an error executing gpg: {}'.format(to_native(e)), exception=traceback.format_exc(), **result) + module.fail_json(msg=f'There was an error executing gpg: {to_native(e)}', exception=traceback.format_exc(), **result) # use whatever logic you need to determine whether or not this module # made any modifications to your target @@ -425,11 +421,10 @@ def run_module(): params['no_protection'] = True input_data = gpg.gen_key_input(**params) - # print(params) - # print(input_data) new_key = gpg.gen_key(input_data) if not new_key: - module.fail_json(msg='Failed to generate a new key.', rc=new_key.returncode, stdout=new_key.data, stderr=new_key.stderr, input_data=input_data, **result) + # do not echo input_data here: it contains the cleartext passphrase + module.fail_json(msg='Failed to generate a new key.', rc=new_key.returncode, stdout=new_key.data, stderr=new_key.stderr, **result) # list the keys again, as we only got the fingerprint from gen_key() keys = gpg.list_keys(secret=True) diff --git a/plugins/modules/nextcloud_occ_app.py b/plugins/modules/nextcloud_occ_app.py index bb7856db..d379b145 100644 --- a/plugins/modules/nextcloud_occ_app.py +++ b/plugins/modules/nextcloud_occ_app.py @@ -1,8 +1,10 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright: (c) 2026, Linuxfabrik GmbH, Zurich, Switzerland, https://www.linuxfabrik.ch -# The Unlicense (see LICENSE or https://unlicense.org/) +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. from __future__ import absolute_import, division, print_function diff --git a/plugins/modules/nextcloud_occ_app_config.py b/plugins/modules/nextcloud_occ_app_config.py index 515895a8..ffd583c5 100644 --- a/plugins/modules/nextcloud_occ_app_config.py +++ b/plugins/modules/nextcloud_occ_app_config.py @@ -1,8 +1,10 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright: (c) 2026, Linuxfabrik GmbH, Zurich, Switzerland, https://www.linuxfabrik.ch -# The Unlicense (see LICENSE or https://unlicense.org/) +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. from __future__ import absolute_import, division, print_function @@ -128,10 +130,6 @@ def main(): installed_config_json=dict(type='raw'), ) - # the AnsibleModule object will be our abstraction working with Ansible - # this includes instantiation, a couple of common attr would be the - # args/params passed to the execution, as well as if this module - # supports check mode module = AnsibleModule( argument_spec=module_args, supports_check_mode=True, @@ -169,7 +167,7 @@ def main(): try: installed_config_json = json.loads(installed_config_json) except (json.JSONDecodeError, ValueError): - module.fail_json(msg=f'Failed to parse installed_config_json') + module.fail_json(msg='Failed to parse installed_config_json') app_configs = installed_config_json.get('apps', {}).get(app, {}) key_exists = name in app_configs diff --git a/plugins/modules/nextcloud_occ_system_config.py b/plugins/modules/nextcloud_occ_system_config.py index 88e995f7..2ae8e6da 100644 --- a/plugins/modules/nextcloud_occ_system_config.py +++ b/plugins/modules/nextcloud_occ_system_config.py @@ -1,8 +1,10 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright: (c) 2026, Linuxfabrik GmbH, Zurich, Switzerland, https://www.linuxfabrik.ch -# The Unlicense (see LICENSE or https://unlicense.org/) +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. from __future__ import absolute_import, division, print_function @@ -119,10 +121,6 @@ def main(): installed_config_json=dict(type='raw'), ) - # the AnsibleModule object will be our abstraction working with Ansible - # this includes instantiation, a couple of common attr would be the - # args/params passed to the execution, as well as if this module - # supports check mode module = AnsibleModule( argument_spec=module_args, supports_check_mode=True, @@ -158,7 +156,7 @@ def main(): try: installed_config_json = json.loads(installed_config_json) except (json.JSONDecodeError, ValueError): - module.fail_json(msg=f'Failed to parse installed_config_json') + module.fail_json(msg='Failed to parse installed_config_json') # navigate nested config by path parts (e.g. "trusted_domains 0") current = installed_config_json.get('system', {}) diff --git a/plugins/modules/sqlite_query.py b/plugins/modules/sqlite_query.py index 4fc5b642..c354b177 100644 --- a/plugins/modules/sqlite_query.py +++ b/plugins/modules/sqlite_query.py @@ -1,8 +1,10 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -# Copyright: (c) 2026, Linuxfabrik GmbH, Zurich, Switzerland, https://www.linuxfabrik.ch -# The Unlicense (see LICENSE or https://unlicense.org/) +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. from __future__ import absolute_import, division, print_function @@ -107,14 +109,14 @@ ''' -from ansible.module_utils.basic import AnsibleModule +import os +import re +import sqlite3 +from ansible.module_utils.basic import AnsibleModule -# all sqlite functions taken from +# the close / connect / select / regexp helpers below are taken from # https://git.linuxfabrik.ch/linuxfabrik/lib/-/blob/master/db_mysql3.py -import os -import sqlite3 -import re def close(conn): @@ -124,7 +126,7 @@ def close(conn): """ try: conn.close() - except: + except Exception: pass return True @@ -143,16 +145,18 @@ def connect(path='', filename=''): conn.text_factory = str conn.create_function("REGEXP", 2, regexp) except Exception as e: - return(False, 'Connecting to DB {} failed, Error: {}, CWD: {}'.format(db, e, os.getcwd())) + return (False, f'Connecting to DB {db} failed, Error: {e}, CWD: {os.getcwd()}') return (True, conn) -def select(conn, sql, data={}, fetchone=False, as_dict=True): +def select(conn, sql, data=None, fetchone=False, as_dict=True): """The SELECT statement is used to query the database. The result of a SELECT is zero or more rows of data where each row has a fixed number of columns. A SELECT statement does not make any changes to the database. """ + if data is None: + data = {} c = conn.cursor() try: if data: @@ -171,7 +175,7 @@ def select(conn, sql, data={}, fetchone=False, as_dict=True): return (True, c.fetchone()) return (True, c.fetchall()) except Exception as e: - return(False, 'Query failed: {}, Error: {}, Data: {}'.format(sql, e, data)) + return (False, f'Query failed: {sql}, Error: {e}, Data: {data}') def regexp(expr, item): @@ -180,6 +184,9 @@ def regexp(expr, item): For Python, you have to implement REGEXP using a Python function at runtime. https://stackoverflow.com/questions/5365451/problem-with-regexp-python-and-sqlite/5365533#5365533 """ + if item is None: + # a NULL column value cannot match a regex (and re.search(None) raises) + return False reg = re.compile(expr) return reg.search(item) is not None @@ -215,7 +222,7 @@ def main(): success, conn = connect(path=path, filename=db) if not success: - module.fail_json(msg='Unable to connect to database: {}'.format(conn)) + module.fail_json(msg=f'Unable to connect to database: {conn}') query_result = [] if query_type == 'select': diff --git a/tests/unit/plugins/module_utils/test_ipa_diff.py b/tests/unit/plugins/module_utils/test_ipa_diff.py new file mode 100644 index 00000000..054d57ce --- /dev/null +++ b/tests/unit/plugins/module_utils/test_ipa_diff.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + +"""Unit tests for the ipa_diff module_util (pure --diff helpers). + +The collection import is wired up by tests/conftest.py. +""" + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import unittest + +from ansible_collections.linuxfabrik.lfops.plugins.module_utils import ipa_diff + + +class TestCompareKey(unittest.TestCase): + + def test_scalar_equal(self): + self.assertTrue(ipa_diff._compare_key('a', 'a')) + self.assertFalse(ipa_diff._compare_key('a', 'b')) + + def test_list_order_insensitive(self): + self.assertTrue(ipa_diff._compare_key(['a', 'b'], ['b', 'a'])) + + def test_list_length_mismatch(self): + self.assertFalse(ipa_diff._compare_key(['a'], ['a', 'b'])) + + def test_scalar_promoted_to_list(self): + # ipa side is a one-element list, arg is the bare scalar + self.assertTrue(ipa_diff._compare_key('a', ['a'])) + + +class TestGenArgsDiff(unittest.TestCase): + + def test_only_changed_keys(self): + before, after = ipa_diff.gen_args_diff({'a': 'x', 'b': 'y'}, {'a': ['x'], 'b': ['z']}) + self.assertEqual(before, {'b': 'z'}) + self.assertEqual(after, {'b': 'y'}) + + def test_ignore_list(self): + before, after = ipa_diff.gen_args_diff({'a': 'x'}, {'a': ['z']}, ignore=['a']) + self.assertEqual((before, after), ({}, {})) + + def test_empty_args(self): + self.assertEqual(ipa_diff.gen_args_diff({}, {'a': ['x']}), ({}, {})) + + +class TestGenMemberDiff(unittest.TestCase): + + def test_no_change(self): + self.assertEqual(ipa_diff.gen_member_diff('member_user', [], [], ['a']), ({}, {})) + + def test_add_and_delete(self): + before, after = ipa_diff.gen_member_diff('member_user', ['c'], ['a'], ['a', 'b']) + self.assertEqual(before, {'member_user': ['a', 'b']}) + self.assertEqual(after, {'member_user': ['b', 'c']}) + + +class TestMergeDiffs(unittest.TestCase): + + def test_merge(self): + before, after = ipa_diff.merge_diffs(({'a': 1}, {'a': 2}), ({'b': 3}, {'b': 4})) + self.assertEqual(before, {'a': 1, 'b': 3}) + self.assertEqual(after, {'a': 2, 'b': 4}) + + +class TestIPADiffTracker(unittest.TestCase): + + def test_empty_build(self): + self.assertEqual(ipa_diff.IPADiffTracker().build_diff(), {}) + + def test_skips_equal_entries(self): + t = ipa_diff.IPADiffTracker() + t.add_entry_diff('host1', {'x': 1}, {'x': 1}) + self.assertEqual(t.build_diff(), {}) + + def test_records_changed_entry(self): + t = ipa_diff.IPADiffTracker() + t.add_entry_diff('host1', {'x': 1}, {'x': 2}) + diff = t.build_diff() + self.assertEqual(len(diff['diff']), 1) + self.assertEqual(diff['diff'][0]['before_header'], 'host1') + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/plugins/modules/test_gpg_key.py b/tests/unit/plugins/modules/test_gpg_key.py new file mode 100644 index 00000000..e7d7ac53 --- /dev/null +++ b/tests/unit/plugins/modules/test_gpg_key.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + +"""Unit tests for the gpg_key module's pure key-matching logic. + +match_key decides whether an existing key satisfies the requested +attributes (used for idempotency); it takes plain dicts and needs no gpg +binary. The collection import is wired up by tests/conftest.py. +""" + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import copy +import unittest + +from ansible_collections.linuxfabrik.lfops.plugins.modules import gpg_key + + +_KEY = { + 'algo': '1', # 1 -> RSA + 'length': '2048', + 'uids': ['Test Name (a comment) '], +} + +_PARAMS = { + 'key_type': 'RSA', + 'key_length': 2048, + 'name_real': 'Test Name', + 'name_comment': 'a comment', + 'name_email': 'test@example.com', + 'subkey_type': None, + 'subkey_length': None, +} + + +class TestMatchKey(unittest.TestCase): + + def test_full_match(self): + self.assertTrue(gpg_key.match_key(copy.deepcopy(_KEY), dict(_PARAMS))) + + def test_wrong_length(self): + params = dict(_PARAMS, key_length=4096) + self.assertFalse(gpg_key.match_key(copy.deepcopy(_KEY), params)) + + def test_wrong_type(self): + params = dict(_PARAMS, key_type='DSA') + self.assertFalse(gpg_key.match_key(copy.deepcopy(_KEY), params)) + + def test_wrong_email(self): + params = dict(_PARAMS, name_email='other@example.com') + self.assertFalse(gpg_key.match_key(copy.deepcopy(_KEY), params)) + + def test_wrong_real_name(self): + params = dict(_PARAMS, name_real='Someone Else') + self.assertFalse(gpg_key.match_key(copy.deepcopy(_KEY), params)) + + def test_subkey_match(self): + key = copy.deepcopy(_KEY) + key['subkey_info'] = {'sub1': {'algo': '1', 'length': '2048'}} + params = dict(_PARAMS, subkey_type='RSA', subkey_length=2048) + self.assertTrue(gpg_key.match_key(key, params)) + + def test_subkey_no_match(self): + key = copy.deepcopy(_KEY) + key['subkey_info'] = {'sub1': {'algo': '1', 'length': '1024'}} + params = dict(_PARAMS, subkey_type='RSA', subkey_length=2048) + self.assertFalse(gpg_key.match_key(key, params)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/plugins/modules/test_sqlite_query.py b/tests/unit/plugins/modules/test_sqlite_query.py new file mode 100644 index 00000000..a162c3df --- /dev/null +++ b/tests/unit/plugins/modules/test_sqlite_query.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8; py-indent-offset: 4 -*- +# +# Author: Linuxfabrik GmbH, Zurich, Switzerland +# Contact: info (at) linuxfabrik (dot) ch +# https://www.linuxfabrik.ch/ +# License: The Unlicense, see LICENSE file. + +"""Unit tests for the sqlite_query module helpers. + +These exercise connect / select / regexp / close against a real +temporary SQLite database (sqlite3 is in the standard library, so no +mocking is needed). The collection import is wired up by +tests/conftest.py. +""" + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import os +import tempfile +import unittest + +from ansible_collections.linuxfabrik.lfops.plugins.modules import sqlite_query as mod + + +class SqliteHelpersTestCase(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp(prefix='lfops_sqlite_test_') + ok, self.conn = mod.connect(path=self.tmpdir, filename='test.db') + self.assertTrue(ok) + cur = self.conn.cursor() + cur.execute('CREATE TABLE t (id INTEGER, name TEXT)') + cur.execute("INSERT INTO t VALUES (1, 'alpha'), (2, 'beta'), (3, NULL)") + self.conn.commit() + + def tearDown(self): + mod.close(self.conn) + + +class TestSelect(SqliteHelpersTestCase): + + def test_as_dict(self): + ok, rows = mod.select(self.conn, 'SELECT id, name FROM t WHERE id = 1') + self.assertTrue(ok) + self.assertEqual(rows, [{'id': 1, 'name': 'alpha'}]) + + def test_as_tuple(self): + ok, rows = mod.select(self.conn, 'SELECT id, name FROM t WHERE id = 1', as_dict=False) + self.assertTrue(ok) + self.assertEqual(tuple(rows[0]), (1, 'alpha')) + + def test_fetch_one(self): + ok, row = mod.select(self.conn, 'SELECT id FROM t ORDER BY id', fetchone=True) + self.assertTrue(ok) + self.assertEqual(row, {'id': 1}) + + def test_fetch_one_empty(self): + ok, row = mod.select(self.conn, 'SELECT id FROM t WHERE id = 999', fetchone=True) + self.assertTrue(ok) + self.assertEqual(row, []) + + def test_named_args(self): + ok, rows = mod.select( + self.conn, 'SELECT name FROM t WHERE id = :wanted', data={'wanted': 2}, + ) + self.assertTrue(ok) + self.assertEqual(rows, [{'name': 'beta'}]) + + def test_bad_query_reports_failure(self): + ok, msg = mod.select(self.conn, 'SELECT * FROM does_not_exist') + self.assertFalse(ok) + self.assertIn('Query failed', msg) + + +class TestRegexp(SqliteHelpersTestCase): + + def test_regexp_in_where(self): + ok, rows = mod.select(self.conn, "SELECT name FROM t WHERE name REGEXP '^al'") + self.assertTrue(ok) + self.assertEqual(rows, [{'name': 'alpha'}]) + + +class TestConnectFailure(unittest.TestCase): + + def test_connect_to_unwritable_path(self): + ok, result = mod.connect(path='/nonexistent/dir/that/should/not/exist', filename='x.db') + self.assertFalse(ok) + self.assertIn('failed', result.lower()) + + +if __name__ == '__main__': + unittest.main()