commit e975fec22beb07c7dcd72150a059dcc7dfb274ac
parent ceea577b1cfd03f242d1be97f9337f36cb85c4c2
Author: Andrew D. Laack <andrew@laack.co>
Date: Sat, 21 Jun 2025 10:41:04 -0500
Refactor tests (#8)
* Added dataclasses to better abstract tests.
* Fixed issue with malformed json output
* Updated grammar throughout
* Fixed an issue where relative file paths impacted blame caching. This was done by resolving the abs. file path for caching.
* Reran tests, verified perf update, bumped version number.
Diffstat:
9 files changed, 200 insertions(+), 200 deletions(-)
diff --git a/pyproject.toml b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "ratchets"
-version = "0.2.10"
+version = "0.2.11"
description = "Ratcheted testing in Python."
authors = [
{ name = "Andrew Laack", email = "andrew@laack.co" }
diff --git a/ratchet_values.json b/ratchet_values.json
@@ -0,0 +1,10 @@
+{
+ "exceptions": 0,
+ "lightning": 0,
+ "tabs": 0,
+ "no_wildcard_imports": 0,
+ "trailing_whitespace": 0,
+ "no_todo_comments": 0,
+ "ensure_trailing_newline": 0,
+ "line_too_long": 0
+}+
\ No newline at end of file
diff --git a/src/ratchets/abstracted_tests.py b/src/ratchets/abstracted_tests.py
@@ -3,6 +3,7 @@ import json
import toml
from pathlib import Path
from typing import Dict, Any, List
+from ratchets.results import MatchResult, TestResult
from .run_tests import (
evaluate_regex_tests,
@@ -75,26 +76,22 @@ def get_filtered_files() -> List[Path]:
return files
-def get_python_test_matches(
- test_name: str, rule: Dict[str, Any]
-) -> List[Dict[str, Any]]:
- """Run a regex test for a single rule and return matches."""
- files = get_filtered_files()
- results: Dict[str, List[Dict[str, Any]]] = evaluate_regex_tests(
- files, {test_name: rule}
- )
- return results.get(test_name, [])
+def get_python_test_matches(test_name: str, rule: Dict[str, Any]) -> List[MatchResult]:
+ """Run a regex test for a single rule and return MatchResult objects."""
+ files = get_filtered_files() # expected to return List[Path] or similar
+ results: Dict[str, TestResult] = evaluate_regex_tests(files, {test_name: rule})
+ tr = results.get(test_name)
+ return tr.matches if tr is not None else []
def get_shell_test_matches(
test_name: str, test_dict: Dict[str, Any]
-) -> List[Dict[str, Any]]:
- """Run a shell test for a single rule and return matches."""
+) -> List[MatchResult]:
+ """Run a shell test for a single rule and return MatchResult objects."""
files = get_filtered_files()
- results: Dict[str, List[Dict[str, Any]]] = evaluate_shell_tests(
- files, {test_name: test_dict}
- )
- return results.get(test_name, [])
+ results: Dict[str, TestResult] = evaluate_shell_tests(files, {test_name: test_dict})
+ tr = results.get(test_name)
+ return tr.matches if tr is not None else []
def check_regex_rule(test_name: str, rule: Dict[str, Any]) -> None:
diff --git a/src/ratchets/caching.py b/src/ratchets/caching.py
@@ -2,23 +2,16 @@ import sqlite3
import argparse
from datetime import datetime
from typing import Optional, Dict, List
+from dataclasses import dataclass
+@dataclass
class BlameRecord:
- def __init__(
- self,
- line_content: str,
- line_number: int,
- timestamp: datetime,
- file_name: str,
- author: str,
- ):
- """Creates a record based on required fields for compatability with blame cache."""
- self.line_content = line_content
- self.line_number = line_number
- self.timestamp = timestamp
- self.file_name = file_name
- self.author = author
+ line_content: str
+ line_number: int
+ timestamp: datetime
+ file_name: str
+ author: str
class CachingDatabase:
@@ -59,10 +52,7 @@ class CachingDatabase:
conn.close()
def create_or_update_blames(self, blames: List[BlameRecord]):
- """
- Insert or update a list of blames:
- if (file_name, line_number) exists, update it; otherwise insert.
- """
+ """Insert or update a list of blames."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
@@ -98,10 +88,7 @@ class CachingDatabase:
conn.close()
def create_or_update_blame(self, blame: BlameRecord):
- """
- Insert or update a blame:
- if (file_name, line_number) exists, update it; otherwise insert.
- """
+ """Insert or update a blame."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
diff --git a/src/ratchets/results.py b/src/ratchets/results.py
@@ -0,0 +1,18 @@
+from datetime import datetime
+from typing import Optional, List
+from dataclasses import dataclass
+
+
+@dataclass
+class MatchResult:
+ file: str
+ line: Optional[int]
+ content: str
+ blame_author: Optional[str] = None
+ blame_time: Optional[datetime] = None
+
+
+@dataclass
+class TestResult:
+ name: str
+ matches: List[MatchResult]
diff --git a/src/ratchets/run_tests.py b/src/ratchets/run_tests.py
@@ -1,3 +1,4 @@
+from ratchets.results import TestResult, MatchResult
from ratchets.caching import CachingDatabase, BlameRecord
import queue
from datetime import datetime
@@ -116,7 +117,7 @@ def evaluate_tests(
regex_only: bool,
paths: Optional[List[str]],
override_filter: bool = False,
-) -> Tuple[Dict[str, List[Dict[str, Any]]], Dict[str, List[Dict[str, Any]]]]:
+) -> Tuple[Dict[str, TestResult], Dict[str, TestResult]]:
"""Runs all requested tests based on the 'path' .toml file."""
assert os.path.isfile(path)
@@ -134,8 +135,8 @@ def evaluate_tests(
if not override_filter:
files = filter_excluded_files(files, excluded_path, ignore_path)
- regex_issues: Dict[str, List[Dict[str, Any]]] = {}
- shell_issues: Dict[str, List[Dict[str, Any]]] = {}
+ regex_issues: Dict[str, TestResult] = {}
+ shell_issues: Dict[str, TestResult] = {}
if regex_tests and not cmd_only:
regex_issues = evaluate_regex_tests(files, regex_tests)
@@ -144,15 +145,16 @@ def evaluate_tests(
return regex_issues, shell_issues
-def print_issues(issues: Dict[str, List[Dict[str, Any]]]) -> None:
- """Print the 'issues' dict in a human readable way."""
- for test_name, matches in issues.items():
- if matches:
- print(f"\n{test_name} — matched {len(matches)} issue(s):")
- for match in matches:
- file_path = match["file"]
- line = match.get("line")
- content = match["content"]
+def print_issues(results: Dict[str, TestResult]) -> None:
+ """Print TestResult objects in a human-readable way."""
+ for test_name, tr in results.items():
+ num = len(tr.matches)
+ if num:
+ print(f"\n{test_name} — matched {num} issue{'s' if num != 1 else ''}:")
+ for m in tr.matches:
+ file_path = m.file
+ line = m.line
+ content = m.content or ""
truncated = content if len(content) <= 50 else content[:50] + "..."
if line is not None:
print(f" -> {file_path}:{line}: {truncated}")
@@ -181,35 +183,35 @@ def load_ratchet_results(file_location: Optional[str] = None) -> Dict[str, Any]:
def evaluate_regex_tests(
files: List[Path], test_str: Dict[str, Dict[str, Any]]
-) -> Dict[str, List[Dict[str, Any]]]:
+) -> Dict[str, TestResult]:
"""Evaluate a list of regex tests in parallel with one thread per test."""
- if len(files) == 0:
+ if not files:
raise Exception("No files were passed in to be evaluated.")
- if len(test_str) == 0:
+ if not test_str:
raise Exception("No regex tests were passed in to be evaluated.")
- results: Dict[str, List[Dict[str, Any]]] = {}
+ results: Dict[str, TestResult] = {}
threads = []
results_lock = threading.Lock()
def eval_thread(test_name: str, rule: Dict[str, Any]):
"""Evaluate a single regular expression across all specified files."""
pattern = re.compile(rule["regex"])
- matches = []
+ tr = TestResult(name=test_name, matches=[])
for file_path in files:
with open(file_path, "r", encoding="utf-8") as f:
for lineno, line in enumerate(f, 1):
if pattern.search(line):
- matches.append(
- {
- "file": str(file_path),
- "line": lineno,
- "content": line.strip(),
- }
+ mr = MatchResult(
+ file=str(file_path),
+ line=lineno,
+ content=line.strip(),
)
+ tr.matches.append(mr)
+
with results_lock:
- results[test_name] = matches
+ results[test_name] = tr
for test_name, rule in test_str.items():
thread = threading.Thread(target=eval_thread, args=(test_name, rule))
@@ -231,70 +233,48 @@ def get_ratchet_path() -> str:
def evaluate_shell_tests(
files: List[Path], test_str: Dict[str, Dict[str, Any]]
-) -> Dict[str, List[Dict[str, Any]]]:
- """Evaluate all shell tests in parallel across each file."""
- if len(test_str) == 0:
+) -> Dict[str, TestResult]:
+ """Evaluate all shell tests in parallel."""
+ if not test_str:
raise Exception("No shell tests passed to evaluation method.")
- if len(files) == 0:
+ if not files:
raise Exception("No files passed to evaluation method.")
- results: Dict[str, List[Dict[str, Any]]] = {test_name: [] for test_name in test_str}
+ results: Dict[str, TestResult] = {
+ test_name: TestResult(name=test_name, matches=[]) for test_name in test_str
+ }
lock = threading.Lock()
- # we track each line number
- # for duplicates, popping them
- # as they are used, if they are
- # used.
-
- file_strs = list(map(str, files))
-
+ file_strs = [str(p) for p in files]
file_lines_map: Dict[str, Dict[str, List[int]]] = build_file_lines_map(file_strs)
def worker(test_name: str, shell_template: str, file_path: Path):
- """Evaluate an individual shell test for a given file."""
file_str = str(file_path)
cmd_str = f"echo {file_str} | {shell_template}"
-
try:
- result = subprocess.run(
+ res = subprocess.run(
cmd_str, shell=True, text=True, capture_output=True, timeout=5
)
-
- output = result.stdout.strip()
-
+ output = res.stdout.strip()
if output:
lines = output.splitlines()
-
with lock:
+ tr = results[test_name]
for line in lines:
-
content = line.rstrip("\n")
line_numbers = file_lines_map[file_str].get(content, [])
-
- # assume we found the last line this happened,
- # remove it, and repeat for each infraction of this line.
- # this can be wrong, but it is impossible to
- # solve the ambiguity of multiple lines matching,
- # but not causing infractions, which can happen
- # when shell commands are defined to consider multiple lines,
- # as can be the case with ast and such.
-
if line_numbers:
- ln = line_numbers[0]
- line_numbers.pop()
- results[test_name].append(
- {
- "file": file_str,
- "line": ln,
- "content": content,
- }
+ ln = line_numbers.pop(0)
+ mr = MatchResult(
+ file=file_str,
+ line=ln,
+ content=content,
)
-
+ tr.matches.append(mr)
except subprocess.TimeoutExpired:
raise Exception(f"Timeout while running test '{test_name}' on {file_path}")
threads = []
-
for test_name, test_dict in test_str.items():
shell_template = test_dict["command"]
for file_path in files:
@@ -311,19 +291,21 @@ def evaluate_shell_tests(
def results_to_json(
- results: Tuple[Dict[str, List[Dict[str, Any]]], Dict[str, List[Dict[str, Any]]]],
-) -> str:
- """Convert test results to a standard JSON formatted string."""
- test_issues, shell_issues = results
+ results: Tuple[Dict[str, TestResult], Dict[str, TestResult]],
+) -> Dict[str, int]:
+ """
+ Convert test results (regex and shell) to a JSON-serializable dict of counts.
+ """
+ regex_results, shell_results = results
counts: Dict[str, int] = {}
- for name, matches in test_issues.items():
- counts[name] = len(matches)
+ for name, tr in regex_results.items():
+ counts[name] = len(tr.matches)
- for name, matches in shell_issues.items():
- counts[name] = counts.get(name, 0) + len(matches)
+ for name, tr in shell_results.items():
+ counts[name] = counts.get(name, 0) + len(tr.matches)
- return json.dumps(counts, indent=2, sort_keys=True)
+ return counts
def update_ratchets(
@@ -343,77 +325,82 @@ def update_ratchets(
path = override_ratchet_path
with open(path, "w") as file:
- file.writelines(results_json)
+ file.write(json.dumps(results_json, indent=2))
def print_issues_with_blames(
- results: Tuple[Dict[str, List[Dict[str, Any]]], Dict[str, List[Dict[str, Any]]]],
+ results: Tuple[Dict[str, TestResult], Dict[str, TestResult]],
max_count: int,
) -> None:
- """Get blame results for each result and print in human readable format."""
- enriched_test_issues, enriched_shell_issues = add_blames(results)
-
- def _parse_time(ts: Optional[str]) -> datetime:
- """Internal method used to convert formatted strings to datetimes."""
- if not ts:
- return datetime.max
+ """
+ Enriches each TestResult with blame info, then prints in human-readable format.
+ Expects:
+ results: (regex_results, shell_results), each Dict[str, TestResult].
+ """
+ enriched_regex, enriched_shell = add_blames(results)
+
+ def _parse_time_obj(ts: Optional[Union[datetime, str]]) -> datetime:
+ """Convert datetime or ISO-string to datetime, with fallback."""
+ if ts is None:
+ return datetime.min
+ if isinstance(ts, datetime):
+ return ts
try:
return datetime.fromisoformat(ts)
except Exception:
- return datetime.max
+ return datetime.min
- def _print_section(
- section_name: str, issues_dict: Dict[str, List[Dict[str, Any]]]
- ) -> None:
- """Internal method used to print the results from an individual test."""
- for test_name, matches in issues_dict.items():
+ def _print_section(section_name: str, results_dict: Dict[str, TestResult]) -> None:
+ for test_name, tr in results_dict.items():
+ matches = tr.matches
if matches:
sorted_matches = sorted(
matches,
- key=lambda m: _parse_time(m.get("blame_time")),
+ key=lambda m: _parse_time_obj(m.blame_time),
reverse=True,
)
+ total = len(sorted_matches)
print()
print(
- f"{section_name} — {test_name} ({len(sorted_matches)}"
- + f" issue{'s' if len(sorted_matches) != 1 else ''}):"
+ f"{section_name} — {test_name}"
+ + f" ({total} issue{'s' if total != 1 else ''}):"
)
print()
- count = 0
-
- for match in sorted_matches:
- count += 1
-
- if count > max_count:
+ for i, m in enumerate(sorted_matches):
+ if i >= max_count:
break
-
- file_path = match.get("file", "<unknown>")
- line_no = match.get("line")
- content = match.get("content", "").strip()
+ file_path = m.file or "<unknown>"
+ line_no = m.line
+ content = (m.content or "").strip()
truncated = content if len(content) <= 80 else content[:80] + "..."
- author = match.get("blame_author") or "Unknown"
- ts = match.get("blame_time") or "Unknown"
-
+ author = m.blame_author or "Unknown"
+ ts_obj = m.blame_time
+ ts_str = (
+ ts_obj.isoformat()
+ if isinstance(ts_obj, datetime)
+ else (ts_obj or "Unknown")
+ )
if line_no is not None:
- print(f" -> {file_path}:{line_no} by {author} at {ts}")
+ print(f" -> {file_path}:{line_no} by {author} at {ts_str}")
print(f" {truncated}")
else:
print(
- f" -> {file_path} file last updated by {author} at {ts}"
+ f" -> {file_path} file last " +
+ "updated by {author} at {ts_str}"
)
print(f" {truncated}")
else:
print(f"\n{section_name} — {test_name}: no issues found.")
- _print_section("Regex Test", enriched_test_issues)
- _print_section("Shell Test", enriched_shell_issues)
+ _print_section("Regex Test", enriched_regex)
+ _print_section("Shell Test", enriched_shell)
def add_blames(
- results: Tuple[Dict[str, List[Dict[str, Any]]], Dict[str, List[Dict[str, Any]]]],
-) -> Tuple[Dict[str, List[Dict[str, Any]]], Dict[str, List[Dict[str, Any]]]]:
- """Add blame information to input results."""
- test_issues, shell_issues = results
+ results: Tuple[Dict[str, TestResult], Dict[str, TestResult]],
+) -> Tuple[Dict[str, TestResult], Dict[str, TestResult]]:
+ """Add blame information to TestResult inputs, enriching each MatchResult."""
+ regex_results, shell_results = results
try:
repo_root: Optional[str] = find_project_root()
@@ -424,20 +411,16 @@ def add_blames(
db = CachingDatabase(db_path)
new_records: List[BlameRecord] = []
- needs_blame: List[Tuple[Dict[str, Any], str, int, str]] = []
-
- # serial cache lookup as running this in
- # parallel imposes too much overhead for the minimal
- # cache lookup cost.
+ needs_blame: List[Tuple[MatchResult, str, int, str]] = []
- for issues in (test_issues, shell_issues):
- for test_name, matches in issues.items():
- for match in matches:
- file_path = match.get("file")
- line_content = match.get("content")
+ for results_dict in (regex_results, shell_results):
+ for _, tr in results_dict.items():
+ for m in tr.matches:
+ file_path = str(Path(m.file).resolve())
+ line_content = m.content
assert line_content is not None
- line_no = match.get("line")
+ line_no = m.line
if not file_path:
continue
if line_no is None:
@@ -446,14 +429,14 @@ def add_blames(
if repo_root is not None:
blame_res: Optional[BlameRecord] = db.get_blame(line_no, file_path)
if blame_res is not None and blame_res.line_content == line_content:
- match["blame_author"] = blame_res.author
- match["blame_time"] = (
- blame_res.timestamp.isoformat()
+ m.blame_author = blame_res.author
+ m.blame_time = (
+ blame_res.timestamp
if isinstance(blame_res.timestamp, datetime)
- else str(blame_res.timestamp)
+ else None
)
continue
- needs_blame.append((match, file_path, line_no, line_content))
+ needs_blame.append((m, file_path, line_no, line_content))
if needs_blame:
task_q = queue.Queue()
@@ -461,13 +444,15 @@ def add_blames(
task_q.put(item)
def worker():
- """Lookup"""
while True:
try:
- match, file_path, line_no, line_content = task_q.get(block=False)
+ m, file_path, line_no, line_content = task_q.get(block=False)
except queue.Empty:
break
- author, author_time = None, None
+
+ author: Optional[str] = None
+ parsed_time: Optional[datetime] = None
+
if repo_root is not None:
try:
cmd = [
@@ -486,42 +471,45 @@ def add_blames(
timeout=5,
)
if res.returncode == 0:
- parsed_author = None
- parsed_time = None
- for l in res.stdout.splitlines():
- if l.startswith("author "):
- parsed_author = l[len("author ") :].strip()
- elif l.startswith("author-time "):
- ts_int = int(l[len("author-time ") :].strip())
- parsed_time = datetime.fromtimestamp(ts_int)
- if (
- parsed_author is not None
- and parsed_time is not None
- ):
+ parsed_author: Optional[str] = None
+ parsed_ts: Optional[datetime] = None
+ for line in res.stdout.splitlines():
+ if line.startswith("author "):
+ parsed_author = line[len("author ") :].strip()
+ elif line.startswith("author-time "):
+ try:
+ ts_int = int(
+ line[len("author-time ") :].strip()
+ )
+ parsed_ts = datetime.fromtimestamp(ts_int)
+ except Exception:
+ parsed_ts = None
+ if parsed_author is not None and parsed_ts is not None:
break
- if parsed_author is not None and parsed_time is not None:
+ if parsed_author is not None and parsed_ts is not None:
author = parsed_author
- author_time = parsed_time.isoformat()
+ parsed_time = parsed_ts
new_records.append(
BlameRecord(
line_content=line_content,
line_number=int(line_no),
- timestamp=parsed_time,
+ timestamp=parsed_ts,
file_name=file_path,
author=parsed_author,
)
)
else:
- print(res.stderr)
+ print(res.stderr, end="")
except Exception:
pass
- match["blame_author"] = author
- match["blame_time"] = author_time
+
+ m.blame_author = author
+ m.blame_time = parsed_time
task_q.task_done()
num_tasks = task_q.qsize()
num_threads = min(MAX_THREADS, num_tasks) if num_tasks > 0 else 0
- threads = []
+ threads: List[threading.Thread] = []
for _ in range(num_threads):
t = threading.Thread(target=worker)
t.daemon = True
@@ -534,7 +522,7 @@ def add_blames(
if new_records:
db.create_or_update_blames(new_records)
- return test_issues, shell_issues
+ return regex_results, shell_results
def expand_paths(file_args: Optional[List[str]]) -> Optional[List[str]]:
@@ -689,18 +677,19 @@ def cli():
print_issues_with_blames(issues, max_count)
elif compare_counts:
issues = evaluate_tests(test_path, cmd_mode, regex_mode, paths)
- current_json = json.loads(results_to_json(issues))
+ current_json = results_to_json(issues)
previous_json = load_ratchet_results()
print_diff(current_json, previous_json)
elif update:
update_ratchets(test_path, cmd_mode, regex_mode, paths)
+ print("Ratchets updated successfully.")
elif verbose:
issues = evaluate_tests(test_path, cmd_mode, regex_mode, paths)
for issue_type in issues:
print_issues(issue_type)
else:
issues = evaluate_tests(test_path, cmd_mode, regex_mode, paths)
- current_json = json.loads(results_to_json(issues))
+ current_json = results_to_json(issues)
print("Current " + str(current_json))
previous_json = load_ratchet_results()
print("Previous: " + str(previous_json))
@@ -718,15 +707,13 @@ def process_file(file_path: str) -> Dict[str, List[int]]:
return file_map
-# After comparing this and a parallelized version; this runs faster.
+# After comparing this and a parallelized version, this runs faster.
# The parallel version used threading which imposed an overhead cost
# so it may be possible to speed this up, but it is not obvious.
def build_file_lines_map(files: List[str]) -> Dict[str, Dict[str, List[int]]]:
- """
- Process files serially, returning a dict mapping file_path to its line-content map.
- """
+ """Process files serially, returning a dict with line contents."""
file_lines_map: Dict[str, Dict[str, List[int]]] = {}
for fp in files:
try:
diff --git a/tests.toml b/tests.toml
@@ -69,7 +69,7 @@ valid = [
invalid = [
"def foo(): \n return 42\t"
]
-description = "Trailing whitespace is invisible but can cause noise in diffs and editors; trim trailing spaces/tabs."
+description = "Trailing whitespace is invisible but can cause noise in diffs and editors."
[ratchet.regex.ensure_trailing_newline]
@@ -96,7 +96,7 @@ invalid = [
"def foo():\n # todo: remove debug code",
"some_var = 'value' # FIXME: adjust value later"
]
-description = "Ensure no TODO or FIXME comments remain in code; address or file an issue instead of leaving these markers."
+description = "Ensure no TODO or FIXME comments remain in code."
[ratchet.shell.line_too_long]
diff --git a/tests/test_files/test_caching.py b/tests/test_files/test_caching.py
@@ -60,7 +60,7 @@ def test_record_updating():
updated = db.get_blame(line_number, file_name)
- assert updated is not None, "We inserted this record; it should not be 'None'"
+ assert updated is not None, "We inserted this record, it should not be 'None'"
assert updated.author == "Author2", "Single-record update failed"
assert updated.line_content == "print('Updated!')"
@@ -77,7 +77,7 @@ def test_record_updating():
updated = db.get_blame(line_number, file_name)
- assert updated is not None, "We inserted this record; it should not be 'None'"
+ assert updated is not None, "We inserted this record, it should not be 'None'"
assert updated.author == "Author3", "Batch-record update failed"
assert updated.line_content == "print('Batch update!')"
diff --git a/tests/test_files/test_files.py b/tests/test_files/test_files.py
@@ -29,8 +29,8 @@ def test_files():
toml_file, False, False, [filtered2_file], True
)
- json1 = json.loads(run_tests.results_to_json(exceptions1))
- json2 = json.loads(run_tests.results_to_json(exceptions2))
+ json1 = run_tests.results_to_json(exceptions1)
+ json2 = run_tests.results_to_json(exceptions2)
exception1_sum = 0
for key in json1: