wiregui/tests/test_account.py
Stefano Bertelli 0546b44507
Some checks failed
CI / test (push) Failing after 26s
CI / release (push) Has been skipped
CI / docker (push) Has been skipped
feat: initial WireGUI implementation — full VPN management platform
Complete Python/NiceGUI rewrite of the Wirezone (Elixir/Phoenix) VPN
management platform. All 10 implementation phases delivered.

Core stack:
- NiceGUI reactive UI with SQLModel ORM on PostgreSQL (asyncpg)
- Alembic migrations, Valkey/Redis cache, pydantic-settings config
- WireGuard management via subprocess (wg/ip/nft CLIs)
- 164 tests passing, 35% code coverage

Features:
- User/device/rule CRUD with admin and unprivileged roles
- Full device config form with per-device WG overrides
- WireGuard client config generation with QR codes
- REST API (v0) with Bearer token auth for all resources
- TOTP MFA with QR registration and challenge flow
- OIDC SSO with authlib (provider registry, auto-create users)
- Magic link passwordless sign-in via email
- SAML SP-initiated SSO with IdP metadata parsing
- WebAuthn/FIDO2 security key registration
- nftables firewall with per-user chains and masquerade
- Background tasks: WG stats polling, VPN session expiry,
  OIDC token refresh, WAN connectivity checks
- Startup reconciliation (DB ↔ WireGuard state sync)
- In-memory notification system with header badge
- Admin UI: users, devices, rules, settings (3 tabs), diagnostics
- Loguru logging with optional timestamped file output

Deployment:
- Multi-stage Dockerfile (python:3.13-slim)
- Docker Compose prod stack (bridge networking, NET_ADMIN, nftables)
- Forgejo CI: tests → semantic versioning → Docker registry push
- Health endpoint at /api/health
2026-03-30 16:53:46 -05:00

161 lines
4.6 KiB
Python

"""Tests for account functionality — password changes, API tokens, OIDC connections."""
import hashlib
from datetime import timedelta
from sqlmodel import func, select
from wiregui.auth.api_token import generate_api_token
from wiregui.auth.passwords import hash_password, verify_password
from wiregui.models.api_token import ApiToken
from wiregui.models.oidc_connection import OIDCConnection
from wiregui.models.user import User
from wiregui.utils.time import utcnow
# --- Password change ---
async def test_password_change_flow(session):
"""Simulate the password change flow: verify old, set new."""
user = User(email="pw-change@example.com", password_hash=hash_password("old-password"))
session.add(user)
await session.flush()
# Verify old password
assert verify_password("old-password", user.password_hash) is True
# Change password
user.password_hash = hash_password("new-password")
session.add(user)
await session.flush()
fetched = await session.get(User, user.id)
assert verify_password("new-password", fetched.password_hash) is True
assert verify_password("old-password", fetched.password_hash) is False
async def test_password_change_wrong_current(session):
"""Wrong current password should not allow change."""
user = User(email="pw-wrong@example.com", password_hash=hash_password("correct"))
session.add(user)
await session.flush()
# Simulate check
assert verify_password("wrong", user.password_hash) is False
# --- API token management ---
async def test_create_multiple_tokens(session):
user = User(email="multi-token@example.com")
session.add(user)
await session.flush()
for _ in range(3):
_, token_hash = generate_api_token()
session.add(ApiToken(token_hash=token_hash, user_id=user.id))
await session.flush()
count = (await session.execute(
select(func.count()).select_from(ApiToken).where(ApiToken.user_id == user.id)
)).scalar()
assert count == 3
async def test_token_with_expiry(session):
user = User(email="expiry-token@example.com")
session.add(user)
await session.flush()
_, token_hash = generate_api_token()
expires = utcnow() + timedelta(days=30)
token = ApiToken(token_hash=token_hash, expires_at=expires, user_id=user.id)
session.add(token)
await session.flush()
fetched = await session.get(ApiToken, token.id)
assert fetched.expires_at is not None
assert fetched.expires_at > utcnow()
async def test_delete_token(session):
user = User(email="del-token@example.com")
session.add(user)
await session.flush()
_, token_hash = generate_api_token()
token = ApiToken(token_hash=token_hash, user_id=user.id)
session.add(token)
await session.flush()
await session.delete(token)
await session.flush()
assert await session.get(ApiToken, token.id) is None
# --- OIDC connections ---
async def test_oidc_connection_create(session):
user = User(email="oidc-conn@example.com")
session.add(user)
await session.flush()
conn = OIDCConnection(
provider="google",
refresh_token="refresh-tok-123",
refresh_response={"access_token": "at", "token_type": "Bearer"},
refreshed_at=utcnow(),
user_id=user.id,
)
session.add(conn)
await session.flush()
fetched = (await session.execute(
select(OIDCConnection).where(OIDCConnection.user_id == user.id)
)).scalar_one()
assert fetched.provider == "google"
assert fetched.refresh_token == "refresh-tok-123"
assert fetched.refresh_response["access_token"] == "at"
async def test_multiple_oidc_providers(session):
user = User(email="multi-oidc@example.com")
session.add(user)
await session.flush()
for provider in ["google", "okta", "azure"]:
conn = OIDCConnection(provider=provider, user_id=user.id)
session.add(conn)
await session.flush()
count = (await session.execute(
select(func.count()).select_from(OIDCConnection).where(OIDCConnection.user_id == user.id)
)).scalar()
assert count == 3
async def test_oidc_connection_update_refresh_token(session):
user = User(email="oidc-refresh@example.com")
session.add(user)
await session.flush()
conn = OIDCConnection(
provider="google",
refresh_token="old-token",
user_id=user.id,
)
session.add(conn)
await session.flush()
conn.refresh_token = "new-token"
conn.refreshed_at = utcnow()
session.add(conn)
await session.flush()
fetched = await session.get(OIDCConnection, conn.id)
assert fetched.refresh_token == "new-token"
assert fetched.refreshed_at is not None