Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/cfengine_cli/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- *.cf (policy files)
- cfbs.json (CFEngine Build project files)
- *.json (basic JSON syntax checking)
- *.csv (basic CSV syntax + RFC 4180 CRLF record terminator check)

This is performed in 3 steps:
1. Parsing - Read the .cf files and convert them into syntax trees
Expand Down Expand Up @@ -38,9 +39,10 @@
from cfbs.validate import validate_config
from cfbs.cfbs_config import CFBSConfig
from cfbs.utils import find
from cfengine_cli.lint_csv import check_csv_file
from cfengine_cli.utils import UserError

LINT_EXTENSIONS = (".cf", ".cf.sub", ".json")
LINT_EXTENSIONS = (".cf", ".cf.sub", ".json", ".csv")
DEFAULT_NAMESPACE = "default"
VARS_TYPES = {
"data",
Expand Down Expand Up @@ -1191,6 +1193,9 @@ def _lint_main(
if filename.endswith(".json"):
errors += _lint_json_selector(filename)
continue
if filename.endswith(".csv"):
errors += _lint_csv(filename)
continue
assert filename.endswith((".cf", ".cf.sub"))
policy_file = PolicyFile(filename, snippet)
r = _check_syntax(policy_file, state)
Expand Down Expand Up @@ -1328,6 +1333,19 @@ def _lint_json_selector(file: str) -> int:
return _lint_json_plain(file)


def _lint_csv(filename: str) -> int:
"""Lint a CSV file: check that csv parses, and that record terminators
are CRLF (per RFC 4180)."""
assert os.path.isfile(filename)
problem = check_csv_file(filename)
r = 0
if problem is not None:
print(f"{filename}: {problem}")
r = 1
print(_pass_fail_filename(filename, r))
return r


# ---------------------------------------------------------------------------
# Syntax error detection (used by both linter and formatter)
# ---------------------------------------------------------------------------
Expand Down
52 changes: 52 additions & 0 deletions src/cfengine_cli/lint_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""CSV file validation per RFC 4180.

The grammar in RFC 4180 mandates CRLF between records. Bare \\r or \\n
outside of quoted fields is not valid. Inside quoted fields, \\r and \\n
are allowed as field content.
"""

import csv


def check_csv_record_terminators(raw: str) -> str | None:
"""Check that all record terminators in a CSV string are CRLF.

Returns None if all record terminators are CRLF, otherwise a short
description of the problem.
"""
in_quotes = False
_prev = None
prev = None
for current in raw:
prev = _prev
_prev = current
if current == '"':
in_quotes = not in_quotes
continue
if in_quotes:
continue
if current == "\n" and prev != "\r":
return "bare LF outside quoted field"
if prev == "\r" and current != "\n":
return "bare CR outside quoted field"
if _prev == "\r" and not in_quotes:
return "bare CR outside quoted field"
return None


def check_csv_file(filename: str) -> str | None:
"""Check a CSV file: parses, has at least one non-empty record, and uses
CRLF record terminators.

Returns None if valid, otherwise a short description of the problem.
"""
try:
with open(filename, newline="") as f:
raw = f.read()
with open(filename, newline="") as f:
rows = list(csv.reader(f, strict=True))
except (OSError, csv.Error) as e:
return str(e)
if not any(rows):
return "no records"
return check_csv_record_terminators(raw)
46 changes: 46 additions & 0 deletions tests/shell/005-lint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash

set -e
set -x

# Setup: create a temp directory for test files
tmpdir=$(mktemp -d)
output_file=$(mktemp)
trap "rm -rf $tmpdir $output_file" EXIT

# Empty JSON file
printf "" > "$tmpdir/empty.json"

# Empty CSV file
printf "" > "$tmpdir/empty.csv"

# Empty policy file
printf "" > "$tmpdir/empty.cf"

# CSV with LF-only line endings
printf 'a,b,c\n1,2,3\n' > "$tmpdir/bad.csv"

# JSON with just some characters
printf 'abc\n' > "$tmpdir/bad.json"

# Policy file with just some characters
printf 'abc\n' > "$tmpdir/bad.cf"

# Run lint on the folder - expect non-zero exit
if cfengine lint "$tmpdir" > "$output_file" 2>&1; then
cat "$output_file"
echo "FAIL: expected lint to fail, but it succeeded"
exit 1
fi
cat "$output_file"

# Verify each file is reported as failing
grep -q "FAIL:.*empty.json" "$output_file"
grep -q "FAIL:.*empty.csv" "$output_file"
grep -q "FAIL:.*empty.cf" "$output_file"
grep -q "FAIL:.*bad.csv" "$output_file"
grep -q "FAIL:.*bad.json" "$output_file"
grep -q "FAIL:.*bad.cf" "$output_file"

# Verify total error count is 6
grep -q "Failure, 6 errors in total" "$output_file"
73 changes: 73 additions & 0 deletions tests/unit/test_lint_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
import tempfile

from cfengine_cli.lint_csv import check_csv_file, check_csv_record_terminators


def _write_temp_csv(content: bytes) -> str:
fd, path = tempfile.mkstemp(suffix=".csv")
with os.fdopen(fd, "wb") as f:
f.write(content)
return path


VALID = [
("crlf_terminated", b"a,b,c\r\n1,2,3\r\n"),
("crlf_no_trailing_newline", b"a,b,c\r\n1,2,3"),
("single_record_no_newline", b"a,b,c"),
("row_of_empty_fields", b",,\r\n"),
("lf_inside_quoted_field", b'a,"line1\nline2",c\r\n'),
("cr_inside_quoted_field", b'a,"line1\rline2",c\r\n'),
("crlf_inside_quoted_field", b'a,"line1\r\nline2",c\r\n'),
("escaped_quote_inside_field", b'a,"he said ""hi""",c\r\n'),
]

INVALID = [
("empty_file", b""),
("only_one_empty_line", b"\r\n"),
("only_empty_lines", b"\r\n\r\n\r\n"),
("lf_only_line_endings", b"a,b,c\n1,2,3\n"),
("cr_only_line_endings", b"a,b,c\r1,2,3\r"),
("mixed_crlf_then_bare_lf", b"a,b,c\r\n1,2,3\nx,y,z\r\n"),
("bare_cr_mid_record", b"a,b\rc,d\r\n"),
("trailing_bare_cr", b"a,b,c\r"),
("trailing_bare_lf", b"a,b,c\n"),
]


def test_check_csv_file_accepts_valid():
for name, content in VALID:
path = _write_temp_csv(content)
try:
assert check_csv_file(path) is None, f"Expected valid: {name}"
finally:
os.unlink(path)


def test_check_csv_file_rejects_invalid():
for name, content in INVALID:
path = _write_temp_csv(content)
try:
assert check_csv_file(path) is not None, f"Expected invalid: {name}"
finally:
os.unlink(path)


def test_check_csv_record_terminators_accepts_crlf():
assert check_csv_record_terminators("a,b\r\nc,d\r\n") is None


def test_check_csv_record_terminators_allows_newlines_inside_quotes():
assert check_csv_record_terminators('"a\nb\rc\r\nd"\r\n') is None


def test_check_csv_record_terminators_rejects_bare_lf():
assert check_csv_record_terminators("a,b\nc,d\n") == "bare LF outside quoted field"


def test_check_csv_record_terminators_rejects_bare_cr():
assert check_csv_record_terminators("a,b\rc,d") == "bare CR outside quoted field"


def test_check_csv_record_terminators_rejects_trailing_bare_cr():
assert check_csv_record_terminators("a,b\r") == "bare CR outside quoted field"
Loading