Complete Python/NiceGUI rewrite of the Wirezone (Elixir/Phoenix) VPN management platform. All 10 implementation phases delivered. Core stack: - NiceGUI reactive UI with SQLModel ORM on PostgreSQL (asyncpg) - Alembic migrations, Valkey/Redis cache, pydantic-settings config - WireGuard management via subprocess (wg/ip/nft CLIs) - 164 tests passing, 35% code coverage Features: - User/device/rule CRUD with admin and unprivileged roles - Full device config form with per-device WG overrides - WireGuard client config generation with QR codes - REST API (v0) with Bearer token auth for all resources - TOTP MFA with QR registration and challenge flow - OIDC SSO with authlib (provider registry, auto-create users) - Magic link passwordless sign-in via email - SAML SP-initiated SSO with IdP metadata parsing - WebAuthn/FIDO2 security key registration - nftables firewall with per-user chains and masquerade - Background tasks: WG stats polling, VPN session expiry, OIDC token refresh, WAN connectivity checks - Startup reconciliation (DB ↔ WireGuard state sync) - In-memory notification system with header badge - Admin UI: users, devices, rules, settings (3 tabs), diagnostics - Loguru logging with optional timestamped file output Deployment: - Multi-stage Dockerfile (python:3.13-slim) - Docker Compose prod stack (bridge networking, NET_ADMIN, nftables) - Forgejo CI: tests → semantic versioning → Docker registry push - Health endpoint at /api/health
86 lines
2.3 KiB
Python
86 lines
2.3 KiB
Python
"""Tests for REST API endpoints and token auth."""
|
|
|
|
import hashlib
|
|
|
|
from wiregui.auth.api_token import generate_api_token, resolve_bearer_token
|
|
from wiregui.auth.passwords import hash_password
|
|
from wiregui.models.api_token import ApiToken
|
|
from wiregui.models.user import User
|
|
from wiregui.utils.time import utcnow
|
|
|
|
|
|
# --- Token generation ---
|
|
|
|
|
|
def test_generate_api_token():
|
|
plaintext, token_hash = generate_api_token()
|
|
assert len(plaintext) > 20
|
|
assert token_hash == hashlib.sha256(plaintext.encode()).hexdigest()
|
|
|
|
|
|
def test_generate_api_token_unique():
|
|
t1, h1 = generate_api_token()
|
|
t2, h2 = generate_api_token()
|
|
assert t1 != t2
|
|
assert h1 != h2
|
|
|
|
|
|
# --- Token resolution ---
|
|
|
|
|
|
async def test_resolve_valid_token(session):
|
|
user = User(email="api-user@example.com", password_hash=hash_password("x"), role="admin")
|
|
session.add(user)
|
|
await session.flush()
|
|
|
|
plaintext, token_hash = generate_api_token()
|
|
token = ApiToken(token_hash=token_hash, user_id=user.id)
|
|
session.add(token)
|
|
await session.flush()
|
|
|
|
resolved = await resolve_bearer_token(session, plaintext)
|
|
assert resolved is not None
|
|
assert resolved.id == user.id
|
|
|
|
|
|
async def test_resolve_invalid_token(session):
|
|
resolved = await resolve_bearer_token(session, "bogus-token")
|
|
assert resolved is None
|
|
|
|
|
|
async def test_resolve_expired_token(session):
|
|
from datetime import timedelta
|
|
|
|
user = User(email="expired-api@example.com", password_hash=hash_password("x"))
|
|
session.add(user)
|
|
await session.flush()
|
|
|
|
plaintext, token_hash = generate_api_token()
|
|
token = ApiToken(
|
|
token_hash=token_hash,
|
|
user_id=user.id,
|
|
expires_at=utcnow() - timedelta(hours=1),
|
|
)
|
|
session.add(token)
|
|
await session.flush()
|
|
|
|
resolved = await resolve_bearer_token(session, plaintext)
|
|
assert resolved is None
|
|
|
|
|
|
async def test_resolve_token_disabled_user(session):
|
|
user = User(
|
|
email="disabled-api@example.com",
|
|
password_hash=hash_password("x"),
|
|
disabled_at=utcnow(),
|
|
)
|
|
session.add(user)
|
|
await session.flush()
|
|
|
|
plaintext, token_hash = generate_api_token()
|
|
token = ApiToken(token_hash=token_hash, user_id=user.id)
|
|
session.add(token)
|
|
await session.flush()
|
|
|
|
resolved = await resolve_bearer_token(session, plaintext)
|
|
assert resolved is None
|