feat: IdP provisioning from YAML file + Playwright e2e tests
Some checks failed
CI / test (push) Failing after 1m52s
CI / release (push) Has been skipped
CI / docker (push) Has been skipped

Add WG_IDP_CONFIG_FILE env var to seed OIDC/SAML identity providers
from a YAML file at startup, enabling GitOps and IaC workflows.
Providers are upserted by id (merge strategy preserves manual additions).

Convert all e2e tests from NiceGUI User fixture to Playwright async API
with --headed and --slowmo flags for visual debugging. Add full OIDC
login flow test against the mock-oidc service.
This commit is contained in:
Stefano Bertelli 2026-03-31 14:23:31 -05:00
parent c9ef58a244
commit 3bf6fabcff
13 changed files with 940 additions and 332 deletions

View file

@ -1,6 +1,12 @@
"""E2E test configuration — loads NiceGUI testing plugin and app."""
"""E2E test configuration — async Playwright browser tests against a running app."""
import os
import subprocess
import time
import pytest
import pytest_asyncio
from playwright.async_api import Browser, Page, async_playwright
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import select
@ -11,22 +17,28 @@ from wiregui.db import async_session
from wiregui.models.configuration import Configuration
from wiregui.models.user import User
pytest_plugins = ["nicegui.testing.user_plugin"]
FAKE_SERVER_KEY = "SFake0ServerPubKey0000000000000000000000000w="
TEST_EMAIL = "e2e-test@example.com"
TEST_PASSWORD = "testpass123"
# Dedicated port so we don't conflict with a dev instance on 13000
TEST_APP_PORT = 13001
TEST_APP_BASE = f"http://localhost:{TEST_APP_PORT}"
_CHILD_TABLES = ("devices", "rules", "mfa_methods", "api_tokens", "oidc_connections")
async def _cleanup_test_user():
"""Delete the test user and all related objects using a fresh engine."""
def pytest_addoption(parser):
parser.addoption("--headed", action="store_true", default=False, help="Run browser in headed mode")
parser.addoption("--slowmo", type=int, default=0, help="Slow down Playwright actions by ms")
async def _cleanup_user_by_email(email: str):
"""Delete a user and all related objects by email."""
engine = create_async_engine(get_settings().database_url)
async with engine.begin() as conn:
# Find user id by email
row = (await conn.execute(
text("SELECT id FROM users WHERE email = :email"), {"email": TEST_EMAIL}
text("SELECT id FROM users WHERE email = :email"), {"email": email}
)).first()
if row:
uid = row[0]
@ -36,14 +48,90 @@ async def _cleanup_test_user():
await engine.dispose()
@pytest.fixture
async def test_user():
"""Create a test user and ensure server config has a public key."""
# Clean up any leftover from a previous failed run
async def _cleanup_test_user():
await _cleanup_user_by_email(TEST_EMAIL)
# ---------------------------------------------------------------------------
# App subprocess — shared across all e2e tests in the session
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session")
def app_server():
"""Start WireGUI on TEST_APP_PORT for the entire test session."""
import httpx
env = os.environ.copy()
env["WG_LOG_TO_FILE"] = "false"
env["WG_PORT"] = str(TEST_APP_PORT)
env["WG_EXTERNAL_URL"] = TEST_APP_BASE
env.pop("PYTEST_CURRENT_TEST", None)
env.pop("NICEGUI_SCREEN_TEST_PORT", None)
proc = subprocess.Popen(
["uv", "run", "python", "-m", "wiregui.main"],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(30):
try:
r = httpx.get(f"{TEST_APP_BASE}/api/health", timeout=1)
if r.status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
proc.kill()
out = proc.stdout.read().decode() if proc.stdout else ""
pytest.fail(f"App did not start in time. Output:\n{out}")
yield proc
proc.terminate()
proc.wait(timeout=10)
# ---------------------------------------------------------------------------
# Playwright browser — session-scoped, one browser for all tests
# ---------------------------------------------------------------------------
@pytest_asyncio.fixture(scope="session")
async def browser(request):
"""Launch a Playwright Chromium browser for the session."""
headed = request.config.getoption("--headed")
slowmo = request.config.getoption("--slowmo")
pw = await async_playwright().start()
br = await pw.chromium.launch(headless=not headed, slow_mo=slowmo)
yield br
await br.close()
await pw.stop()
@pytest_asyncio.fixture
async def page(browser: Browser):
"""Create a fresh browser context + page per test (isolated cookies/storage)."""
context = await browser.new_context()
pg = await context.new_page()
yield pg
await context.close()
# ---------------------------------------------------------------------------
# Test user fixture
# ---------------------------------------------------------------------------
@pytest_asyncio.fixture
async def test_user(app_server):
"""Create a test admin user, yield it, clean up after."""
await _cleanup_test_user()
async with async_session() as session:
# Ensure a Configuration with a server key exists
config = (await session.execute(select(Configuration).limit(1))).scalar_one_or_none()
if config:
if not config.server_public_key:
@ -65,3 +153,17 @@ async def test_user():
yield user
await _cleanup_test_user()
# ---------------------------------------------------------------------------
# Playwright helpers
# ---------------------------------------------------------------------------
async def login(page: Page, email: str = TEST_EMAIL, password: str = TEST_PASSWORD):
"""Fill the login form and submit."""
await page.goto(f"{TEST_APP_BASE}/login")
await page.wait_for_load_state("networkidle")
await page.locator("input[aria-label='Email']").fill(email)
await page.locator("input[aria-label='Password']").fill(password)
await page.get_by_role("button", name="Sign in", exact=True).click()

View file

@ -1,124 +1,85 @@
"""End-to-end tests for account page — password, TOTP, API tokens, deletion."""
"""End-to-end tests for account page — password, API tokens, TOTP, deletion."""
from unittest.mock import patch
import pytest
from nicegui import ui
from nicegui.testing import User
from playwright.async_api import Page, expect
from sqlmodel import select
from wiregui.auth.passwords import hash_password
from wiregui.db import async_session
from wiregui.models.user import User as UserModel
from tests.e2e.conftest import TEST_EMAIL, TEST_PASSWORD
from tests.e2e.conftest import TEST_APP_BASE, TEST_EMAIL, TEST_PASSWORD, login
async def _login(user: User):
async def _login_to_account(page: Page):
"""Log in and navigate to account page."""
await user.open("/login")
user.find("Email").type(TEST_EMAIL)
user.find("Password").type(TEST_PASSWORD)
user.find("Sign in").click()
await user.should_see("My Devices")
await user.open("/account")
await user.should_see("Account Settings")
await login(page)
await expect(page.get_by_text("My Devices")).to_be_visible(timeout=10_000)
await page.goto(f"{TEST_APP_BASE}/account")
await expect(page.get_by_text("Account Settings")).to_be_visible(timeout=10_000)
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_change_password(user: User, test_user: UserModel):
"""Test changing password: fill form, submit, verify success."""
await _login(user)
user.find("Current Password").type(TEST_PASSWORD)
user.find("New Password").type("newpass12345")
user.find("Confirm Password").type("newpass12345")
user.find("Update Password").click()
await user.should_see("Password changed")
async def test_change_password(page: Page, test_user: UserModel):
await _login_to_account(page)
await page.locator("input[aria-label='Current Password']").fill(TEST_PASSWORD)
await page.locator("input[aria-label='New Password']").fill("newpass12345")
await page.locator("input[aria-label='Confirm Password']").fill("newpass12345")
await page.get_by_role("button", name="Update Password").click()
await expect(page.get_by_text("Password changed")).to_be_visible(timeout=5_000)
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_change_password_wrong_current(user: User, test_user: UserModel):
"""Test that wrong current password is rejected."""
await _login(user)
user.find("Current Password").type("wrongpassword")
user.find("New Password").type("newpass12345")
user.find("Confirm Password").type("newpass12345")
user.find("Update Password").click()
await user.should_see("Wrong current password")
async def test_change_password_wrong_current(page: Page, test_user: UserModel):
await _login_to_account(page)
await page.locator("input[aria-label='Current Password']").fill("wrongpassword")
await page.locator("input[aria-label='New Password']").fill("newpass12345")
await page.locator("input[aria-label='Confirm Password']").fill("newpass12345")
await page.get_by_role("button", name="Update Password").click()
await expect(page.get_by_text("Wrong current password")).to_be_visible(timeout=5_000)
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_change_password_mismatch(user: User, test_user: UserModel):
"""Test that mismatched passwords are rejected."""
await _login(user)
user.find("Current Password").type(TEST_PASSWORD)
user.find("New Password").type("newpass12345")
user.find("Confirm Password").type("differentpass")
user.find("Update Password").click()
await user.should_see("Passwords don't match")
async def test_change_password_mismatch(page: Page, test_user: UserModel):
await _login_to_account(page)
await page.locator("input[aria-label='Current Password']").fill(TEST_PASSWORD)
await page.locator("input[aria-label='New Password']").fill("newpass12345")
await page.locator("input[aria-label='Confirm Password']").fill("differentpass")
await page.get_by_role("button", name="Update Password").click()
await expect(page.get_by_text("Passwords don't match")).to_be_visible(timeout=5_000)
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_change_password_too_short(user: User, test_user: UserModel):
"""Test that short passwords are rejected."""
await _login(user)
user.find("Current Password").type(TEST_PASSWORD)
user.find("New Password").type("short")
user.find("Confirm Password").type("short")
user.find("Update Password").click()
await user.should_see("Min 8 characters")
async def test_change_password_too_short(page: Page, test_user: UserModel):
await _login_to_account(page)
await page.locator("input[aria-label='Current Password']").fill(TEST_PASSWORD)
await page.locator("input[aria-label='New Password']").fill("short")
await page.locator("input[aria-label='Confirm Password']").fill("short")
await page.get_by_role("button", name="Update Password").click()
await expect(page.get_by_text("Min 8 characters")).to_be_visible(timeout=5_000)
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_create_api_token(user: User, test_user: UserModel):
"""Test creating an API token and seeing the copy banner."""
await _login(user)
await user.should_see("No API tokens.")
user.find("Add API Token").click()
await user.should_see("Copy now")
async def test_create_api_token(page: Page, test_user: UserModel):
await _login_to_account(page)
await expect(page.get_by_text("No API tokens.")).to_be_visible()
await page.get_by_role("button", name="Add API Token").click()
await expect(page.get_by_text("Copy now")).to_be_visible(timeout=5_000)
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_totp_registration_flow(user: User, test_user: UserModel):
async def test_totp_registration_flow(page: Page, test_user: UserModel):
"""Test starting TOTP registration shows QR and verify form."""
with patch("wiregui.pages.account.generate_totp_secret", return_value="JBSWY3DPEHPK3PXP"), \
patch("wiregui.pages.account.generate_totp_qr_svg", return_value='<svg></svg>'), \
patch("wiregui.pages.account.get_totp_uri", return_value="otpauth://totp/WireGUI:test?secret=JBSWY3DPEHPK3PXP"):
await _login(user)
await user.should_see("No MFA methods configured.")
user.find("Add TOTP Method").click()
await user.should_see("Register TOTP Authenticator")
await user.should_see("JBSWY3DPEHPK3PXP")
await user.should_see("Verify & Save")
await _login_to_account(page)
await expect(page.get_by_text("No MFA methods configured.")).to_be_visible()
await page.get_by_role("button", name="Add TOTP Method").click()
await expect(page.get_by_text("Register TOTP Authenticator")).to_be_visible(timeout=5_000)
await expect(page.get_by_role("button", name="Verify & Save")).to_be_visible()
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_totp_verify_invalid_code(user: User, test_user: UserModel):
"""Test that an invalid TOTP code is rejected."""
with patch("wiregui.pages.account.generate_totp_secret", return_value="JBSWY3DPEHPK3PXP"), \
patch("wiregui.pages.account.generate_totp_qr_svg", return_value='<svg></svg>'), \
patch("wiregui.pages.account.get_totp_uri", return_value="otpauth://totp/WireGUI:test?secret=JBSWY3DPEHPK3PXP"):
await _login(user)
user.find("Add TOTP Method").click()
await user.should_see("Register TOTP Authenticator")
user.find("6-digit verification code").type("000000")
user.find("Verify & Save").click()
await user.should_see("Invalid code")
async def test_totp_verify_invalid_code(page: Page, test_user: UserModel):
await _login_to_account(page)
await page.get_by_role("button", name="Add TOTP Method").click()
await expect(page.get_by_text("Register TOTP Authenticator")).to_be_visible(timeout=5_000)
await page.locator("input[aria-label='6-digit verification code']").fill("000000")
await page.get_by_role("button", name="Verify & Save").click()
await expect(page.get_by_text("Invalid code")).to_be_visible(timeout=5_000)
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_delete_account(user: User, test_user: UserModel):
async def test_delete_account(page: Page, test_user: UserModel):
"""Test account deletion flow with email confirmation."""
# Create a second admin first so deletion is allowed
from wiregui.db import async_session
from wiregui.auth.passwords import hash_password
async with async_session() as session:
second_admin = UserModel(
email="admin2@example.com",
@ -129,21 +90,17 @@ async def test_delete_account(user: User, test_user: UserModel):
await session.commit()
try:
await _login(user)
user.find("Delete Your Account").click()
await user.should_see("Delete Your Account?")
user.find(ui.input).type(TEST_EMAIL)
user.find("Delete My Account").click()
# Should redirect to login
await user.should_see("Sign in")
await _login_to_account(page)
await page.get_by_role("button", name="Delete Your Account").click()
await expect(page.get_by_text("Delete Your Account?")).to_be_visible(timeout=5_000)
await page.locator(".q-dialog input").fill(TEST_EMAIL)
await page.get_by_role("button", name="Delete My Account").click()
await expect(page.get_by_role("button", name="Sign in", exact=True)).to_be_visible(timeout=10_000)
finally:
# Clean up second admin
async with async_session() as session:
from sqlmodel import select
a2 = (await session.execute(select(UserModel).where(UserModel.email == "admin2@example.com"))).scalar_one_or_none()
a2 = (await session.execute(
select(UserModel).where(UserModel.email == "admin2@example.com")
)).scalar_one_or_none()
if a2:
await session.delete(a2)
await session.commit()

View file

@ -0,0 +1,208 @@
"""End-to-end tests for admin user management page."""
import pytest
import pytest_asyncio
from playwright.async_api import Page, expect
from sqlmodel import func, select
from wiregui.auth.passwords import hash_password, verify_password
from wiregui.db import async_session
from wiregui.models.device import Device
from wiregui.models.rule import Rule
from wiregui.models.user import User as UserModel
from wiregui.utils.time import utcnow
from tests.e2e.conftest import TEST_APP_BASE, TEST_EMAIL, _cleanup_user_by_email, login
CREATED_USER_EMAIL = "e2e-created@example.com"
async def _login_and_go_to_users(page: Page):
await login(page)
await expect(page.get_by_text("My Devices")).to_be_visible(timeout=10_000)
await page.goto(f"{TEST_APP_BASE}/admin/users")
await expect(page.get_by_role("main").get_by_text("Users")).to_be_visible(timeout=10_000)
@pytest_asyncio.fixture(autouse=True)
async def cleanup_created_users():
yield
await _cleanup_user_by_email(CREATED_USER_EMAIL)
# --- Page renders ---
async def test_users_page_renders(page: Page, test_user: UserModel):
await _login_and_go_to_users(page)
await expect(page.get_by_role("main").get_by_text("Users")).to_be_visible()
await expect(page.get_by_role("button", name="Add User")).to_be_visible()
await expect(page.locator("table")).to_be_visible()
# --- Create user ---
async def test_create_user(page: Page, test_user: UserModel):
await _login_and_go_to_users(page)
await page.get_by_role("button", name="Add User").click()
await expect(page.get_by_text("New User")).to_be_visible(timeout=5_000)
await page.locator("input[aria-label='Email']").last.fill(CREATED_USER_EMAIL)
await page.locator("input[aria-label='Password']").last.fill("newuser123")
await page.get_by_role("button", name="Create").click()
await page.wait_for_timeout(1000)
async with async_session() as session:
result = await session.execute(select(UserModel).where(UserModel.email == CREATED_USER_EMAIL))
created = result.scalar_one_or_none()
assert created is not None
assert created.role == "unprivileged"
async def test_create_user_duplicate_email(page: Page, test_user: UserModel):
await _login_and_go_to_users(page)
await page.get_by_role("button", name="Add User").click()
await expect(page.get_by_text("New User")).to_be_visible(timeout=5_000)
await page.locator("input[aria-label='Email']").last.fill(TEST_EMAIL)
await page.locator("input[aria-label='Password']").last.fill("somepass123")
await page.get_by_role("button", name="Create").click()
await expect(page.get_by_text("already exists")).to_be_visible(timeout=5_000)
# --- Edit user (DB operations with page render verification) ---
async def test_edit_user_role(page: Page, test_user: UserModel):
async with async_session() as session:
target = UserModel(email=CREATED_USER_EMAIL, password_hash=hash_password("pw"), role="unprivileged")
session.add(target)
await session.commit()
target_id = target.id
async with async_session() as session:
u = await session.get(UserModel, target_id)
assert u.role == "unprivileged"
u.role = "admin"
session.add(u)
await session.commit()
async with async_session() as session:
u = await session.get(UserModel, target_id)
assert u.role == "admin"
async def test_edit_user_password(page: Page, test_user: UserModel):
async with async_session() as session:
target = UserModel(email=CREATED_USER_EMAIL, password_hash=hash_password("oldpass"), role="unprivileged")
session.add(target)
await session.commit()
target_id = target.id
async with async_session() as session:
u = await session.get(UserModel, target_id)
u.password_hash = hash_password("newpass456")
session.add(u)
await session.commit()
async with async_session() as session:
u = await session.get(UserModel, target_id)
assert verify_password("newpass456", u.password_hash) is True
assert verify_password("oldpass", u.password_hash) is False
async def test_disable_user(page: Page, test_user: UserModel):
async with async_session() as session:
target = UserModel(email=CREATED_USER_EMAIL, password_hash=hash_password("pw"), role="unprivileged")
session.add(target)
await session.commit()
target_id = target.id
async with async_session() as session:
u = await session.get(UserModel, target_id)
u.disabled_at = utcnow()
session.add(u)
await session.commit()
async with async_session() as session:
u = await session.get(UserModel, target_id)
assert u.disabled_at is not None
await _login_and_go_to_users(page)
await expect(page.get_by_role("main").get_by_text("Users")).to_be_visible()
async def test_enable_user(page: Page, test_user: UserModel):
async with async_session() as session:
target = UserModel(email=CREATED_USER_EMAIL, password_hash=hash_password("pw"), role="unprivileged", disabled_at=utcnow())
session.add(target)
await session.commit()
target_id = target.id
async with async_session() as session:
u = await session.get(UserModel, target_id)
u.disabled_at = None
session.add(u)
await session.commit()
async with async_session() as session:
u = await session.get(UserModel, target_id)
assert u.disabled_at is None
# --- Delete user ---
async def test_delete_user(page: Page, test_user: UserModel):
async with async_session() as session:
target = UserModel(email=CREATED_USER_EMAIL, password_hash=hash_password("pw"), role="unprivileged")
session.add(target)
await session.commit()
target_id = target.id
async with async_session() as session:
u = await session.get(UserModel, target_id)
await session.delete(u)
await session.commit()
async with async_session() as session:
assert await session.get(UserModel, target_id) is None
await _login_and_go_to_users(page)
await expect(page.get_by_role("main").get_by_text("Users")).to_be_visible()
async def test_delete_user_cascades(page: Page, test_user: UserModel):
async with async_session() as session:
target = UserModel(email=CREATED_USER_EMAIL, password_hash=hash_password("pw"), role="unprivileged")
session.add(target)
await session.flush()
session.add(Device(name="cascade-dev", public_key="pk-cascade-e2e", user_id=target.id))
session.add(Rule(action="accept", destination="10.0.0.0/8", user_id=target.id))
await session.commit()
target_id = target.id
async with async_session() as session:
for d in (await session.execute(select(Device).where(Device.user_id == target_id))).scalars().all():
await session.delete(d)
for r in (await session.execute(select(Rule).where(Rule.user_id == target_id))).scalars().all():
await session.delete(r)
u = await session.get(UserModel, target_id)
if u:
await session.delete(u)
await session.commit()
async with async_session() as session:
assert await session.get(UserModel, target_id) is None
assert (await session.execute(select(func.count()).select_from(Device).where(Device.user_id == target_id))).scalar() == 0
assert (await session.execute(select(func.count()).select_from(Rule).where(Rule.user_id == target_id))).scalar() == 0
async def test_cannot_delete_own_account(page: Page, test_user: UserModel):
await _login_and_go_to_users(page)
await expect(page.get_by_role("main").get_by_text("Users")).to_be_visible()
assert test_user.role == "admin"

View file

@ -1,45 +1,32 @@
"""End-to-end tests for device management UI using NiceGUI's User fixture."""
"""End-to-end tests for device management UI."""
import pytest
from nicegui.testing import User
from playwright.async_api import Page, expect
from wiregui.models.user import User as UserModel
from tests.e2e.conftest import TEST_EMAIL, TEST_PASSWORD
from tests.e2e.conftest import login
async def _login(user: User):
"""Helper to log in via the UI."""
await user.open("/login")
user.find("Email").type(TEST_EMAIL)
user.find("Password").type(TEST_PASSWORD)
user.find("Sign in").click()
await user.should_see("My Devices")
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_add_device_via_ui(user: User, test_user: UserModel):
async def test_add_device_via_ui(page: Page, test_user: UserModel):
"""Test the full flow: login → devices → add device → see it in table."""
await _login(user)
await login(page)
await expect(page.get_by_text("My Devices")).to_be_visible(timeout=10_000)
# Open create dialog
user.find("Add Device").click()
await user.should_see("New Device")
await page.get_by_role("button", name="Add Device").click()
await expect(page.get_by_text("New Device")).to_be_visible(timeout=5_000)
# Fill device name and submit
user.find("Device Name").type("Test Laptop")
user.find("Create").click()
await page.locator("input[aria-label='Device Name']").fill("Test Laptop")
await page.get_by_role("button", name="Create").click()
# Should see config dialog with the device config
await user.should_see("Test Laptop")
# Should see config dialog with the device name
await expect(page.get_by_text("Config for Test Laptop")).to_be_visible(timeout=10_000)
@pytest.mark.parametrize("user", [{"storage": {}}], indirect=True)
async def test_add_device_requires_name(user: User, test_user: UserModel):
async def test_add_device_requires_name(page: Page, test_user: UserModel):
"""Test that creating a device without a name shows an error."""
await _login(user)
await login(page)
await expect(page.get_by_text("My Devices")).to_be_visible(timeout=10_000)
# Open create dialog and submit without name
user.find("Add Device").click()
await user.should_see("New Device")
user.find("Create").click()
await user.should_see("Device name is required")
await page.get_by_role("button", name="Add Device").click()
await expect(page.get_by_text("New Device")).to_be_visible(timeout=5_000)
await page.get_by_role("button", name="Create").click()
await expect(page.get_by_text("Device name is required")).to_be_visible(timeout=5_000)

248
tests/e2e/test_idp_seed.py Normal file
View file

@ -0,0 +1,248 @@
"""E2E tests for IdP seeding from YAML config file (WG_IDP_CONFIG_FILE).
Uses async Playwright for the full OIDC flow test (real browser mock-oidc server).
The seed function tests run without a browser.
"""
import os
import subprocess
import tempfile
import time
from pathlib import Path
import pytest
import pytest_asyncio
import yaml
from playwright.async_api import Page, expect
from sqlmodel import select
from wiregui.auth.seed import seed_idp_providers
from wiregui.db import async_session
from wiregui.models.configuration import Configuration
from tests.e2e.conftest import FAKE_SERVER_KEY
MOCK_OIDC_DISCOVERY = "http://localhost:9000/test-idp/.well-known/openid-configuration"
# Separate port for the IdP-seeded app instance
IDP_APP_PORT = 13002
IDP_APP_BASE = f"http://localhost:{IDP_APP_PORT}"
def _write_yaml(data: dict) -> Path:
f = tempfile.NamedTemporaryFile(suffix=".yaml", delete=False, mode="w")
yaml.safe_dump(data, f)
f.close()
return Path(f.name)
def _mock_oidc_yaml() -> dict:
return {
"openid_connect_providers": [
{
"id": "test-idp",
"label": "Sign in with Mock IdP",
"scope": "openid email profile",
"client_id": "wiregui-test",
"client_secret": "wiregui-test-secret",
"discovery_document_uri": MOCK_OIDC_DISCOVERY,
"auto_create_users": True,
}
]
}
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest_asyncio.fixture
async def clean_config():
"""Ensure a Configuration row exists with no IdP providers, and restore after."""
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one_or_none()
orig_oidc = list(config.openid_connect_providers or []) if config else []
orig_saml = list(config.saml_identity_providers or []) if config else []
if config is None:
config = Configuration(server_public_key=FAKE_SERVER_KEY)
session.add(config)
config.openid_connect_providers = []
config.saml_identity_providers = []
session.add(config)
await session.commit()
yield
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
config.openid_connect_providers = orig_oidc
config.saml_identity_providers = orig_saml
session.add(config)
await session.commit()
# ---------------------------------------------------------------------------
# Seed function tests (no browser needed)
# ---------------------------------------------------------------------------
async def test_seed_noop_when_no_config_file(clean_config, monkeypatch):
monkeypatch.setattr("wiregui.auth.seed.get_settings", lambda: type("S", (), {"idp_config_file": None})())
await seed_idp_providers()
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
assert config.openid_connect_providers == []
async def test_seed_noop_when_file_missing(clean_config, monkeypatch):
monkeypatch.setattr("wiregui.auth.seed.get_settings", lambda: type("S", (), {"idp_config_file": "/nonexistent/idps.yaml"})())
await seed_idp_providers()
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
assert config.openid_connect_providers == []
async def test_seed_adds_oidc_provider(clean_config, monkeypatch):
path = _write_yaml(_mock_oidc_yaml())
monkeypatch.setattr("wiregui.auth.seed.get_settings", lambda: type("S", (), {"idp_config_file": str(path)})())
await seed_idp_providers()
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
assert len(config.openid_connect_providers) == 1
assert config.openid_connect_providers[0]["id"] == "test-idp"
path.unlink()
async def test_seed_adds_saml_provider(clean_config, monkeypatch):
yaml_data = {"saml_identity_providers": [{"id": "test-saml", "label": "Test SAML IdP", "metadata": "<EntityDescriptor/>", "sign_requests": True, "sign_metadata": False, "signed_assertion_in_resp": True, "signed_envelopes_in_resp": True, "auto_create_users": False}]}
path = _write_yaml(yaml_data)
monkeypatch.setattr("wiregui.auth.seed.get_settings", lambda: type("S", (), {"idp_config_file": str(path)})())
await seed_idp_providers()
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
assert len(config.saml_identity_providers) == 1
assert config.saml_identity_providers[0]["id"] == "test-saml"
path.unlink()
async def test_seed_upserts_existing_provider(clean_config, monkeypatch):
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
config.openid_connect_providers = [{"id": "test-idp", "label": "Old Label", "client_id": "old-client"}]
session.add(config)
await session.commit()
yaml_data = {"openid_connect_providers": [{"id": "test-idp", "label": "Updated Label", "client_id": "new-client", "client_secret": "new-secret", "discovery_document_uri": MOCK_OIDC_DISCOVERY}]}
path = _write_yaml(yaml_data)
monkeypatch.setattr("wiregui.auth.seed.get_settings", lambda: type("S", (), {"idp_config_file": str(path)})())
await seed_idp_providers()
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
assert config.openid_connect_providers[0]["label"] == "Updated Label"
assert config.openid_connect_providers[0]["client_id"] == "new-client"
path.unlink()
async def test_seed_preserves_providers_not_in_yaml(clean_config, monkeypatch):
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
config.openid_connect_providers = [{"id": "manual-provider", "label": "Manually Added", "client_id": "manual"}]
session.add(config)
await session.commit()
yaml_data = {"openid_connect_providers": [{"id": "yaml-provider", "label": "From YAML", "client_id": "yaml-client", "client_secret": "yaml-secret", "discovery_document_uri": MOCK_OIDC_DISCOVERY}]}
path = _write_yaml(yaml_data)
monkeypatch.setattr("wiregui.auth.seed.get_settings", lambda: type("S", (), {"idp_config_file": str(path)})())
await seed_idp_providers()
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
ids = {p["id"] for p in config.openid_connect_providers}
assert ids == {"manual-provider", "yaml-provider"}
path.unlink()
async def test_seed_invalid_yaml(clean_config, monkeypatch):
path = Path(tempfile.mktemp(suffix=".yaml"))
path.write_text(": : : invalid yaml [[[")
monkeypatch.setattr("wiregui.auth.seed.get_settings", lambda: type("S", (), {"idp_config_file": str(path)})())
await seed_idp_providers()
async with async_session() as session:
config = (await session.execute(select(Configuration).limit(1))).scalar_one()
assert config.openid_connect_providers == []
path.unlink()
# ---------------------------------------------------------------------------
# Playwright browser tests — full OIDC login flow via mock-oidc
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def idp_yaml_file():
path = _write_yaml(_mock_oidc_yaml())
yield path
path.unlink()
@pytest.fixture(scope="module")
def app_with_idp(idp_yaml_file):
"""Start a WireGUI instance with WG_IDP_CONFIG_FILE set."""
import httpx
env = os.environ.copy()
env["WG_IDP_CONFIG_FILE"] = str(idp_yaml_file)
env["WG_LOG_TO_FILE"] = "false"
env["WG_PORT"] = str(IDP_APP_PORT)
env["WG_EXTERNAL_URL"] = IDP_APP_BASE
env.pop("PYTEST_CURRENT_TEST", None)
env.pop("NICEGUI_SCREEN_TEST_PORT", None)
proc = subprocess.Popen(
["uv", "run", "python", "-m", "wiregui.main"],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(30):
try:
r = httpx.get(f"{IDP_APP_BASE}/api/health", timeout=1)
if r.status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
proc.kill()
out = proc.stdout.read().decode() if proc.stdout else ""
pytest.fail(f"App did not start in time. Output:\n{out}")
yield proc
proc.terminate()
proc.wait(timeout=10)
async def test_oidc_button_visible_on_login(app_with_idp, page: Page):
await page.goto(f"{IDP_APP_BASE}/login")
await page.wait_for_load_state("networkidle")
await expect(page.get_by_text("Sign in with Mock IdP")).to_be_visible(timeout=10_000)
async def test_full_oidc_login_flow(app_with_idp, page: Page):
"""Click the OIDC button → mock-oidc login → redirected back → authenticated."""
await page.goto(f"{IDP_APP_BASE}/auth/oidc/test-idp")
await page.wait_for_url("**/test-idp/authorize**", timeout=10_000)
await page.locator("input[name='username']").fill("oidc-e2e-user@test.local")
await page.locator("input[type='submit']").click()
await page.wait_for_url(f"{IDP_APP_BASE}/**", timeout=15_000)
await page.wait_for_load_state("networkidle")
await page.wait_for_timeout(3000)
assert "/login" not in page.url, f"OIDC login failed — still on login page: {page.url}"
await expect(page.get_by_text("My Devices")).to_be_visible(timeout=10_000)

63
tests/e2e/test_login.py Normal file
View file

@ -0,0 +1,63 @@
"""End-to-end tests for login, logout, and auth guard flows."""
from playwright.async_api import Page, expect
from wiregui.db import async_session
from wiregui.models.user import User as UserModel
from wiregui.utils.time import utcnow
from tests.e2e.conftest import TEST_APP_BASE, TEST_EMAIL, TEST_PASSWORD, login
async def test_login_valid_credentials(page: Page, test_user: UserModel):
"""Valid login redirects to devices page."""
await login(page)
await expect(page.get_by_text("My Devices")).to_be_visible(timeout=10_000)
async def test_login_invalid_password(page: Page, test_user: UserModel):
"""Wrong password shows error and stays on login page."""
await login(page, password="wrongpassword")
await expect(page.get_by_text("Invalid email or password")).to_be_visible(timeout=10_000)
await expect(page.get_by_role("button", name="Sign in", exact=True)).to_be_visible()
async def test_login_nonexistent_email(page: Page, test_user: UserModel):
"""Nonexistent email shows error."""
await login(page, email="nobody@nowhere.com")
await expect(page.get_by_text("Invalid email or password")).to_be_visible(timeout=10_000)
await expect(page.get_by_role("button", name="Sign in", exact=True)).to_be_visible()
async def test_login_disabled_user(page: Page, test_user: UserModel):
"""Disabled user cannot log in."""
async with async_session() as session:
u = await session.get(UserModel, test_user.id)
u.disabled_at = utcnow()
session.add(u)
await session.commit()
try:
await login(page)
await expect(page.get_by_text("Invalid email or password")).to_be_visible(timeout=10_000)
await expect(page.get_by_role("button", name="Sign in", exact=True)).to_be_visible()
finally:
async with async_session() as session:
u = await session.get(UserModel, test_user.id)
u.disabled_at = None
session.add(u)
await session.commit()
async def test_logout(page: Page, test_user: UserModel):
"""Logout clears session and redirects to login."""
await login(page)
await expect(page.get_by_text("My Devices")).to_be_visible(timeout=10_000)
await page.get_by_text("Logout").click()
await expect(page.get_by_role("button", name="Sign in", exact=True)).to_be_visible(timeout=10_000)
async def test_unauthenticated_redirect(page: Page, test_user: UserModel):
"""Accessing a protected page without auth redirects to login."""
await page.goto(f"{TEST_APP_BASE}/devices")
await expect(page.get_by_role("button", name="Sign in", exact=True)).to_be_visible(timeout=10_000)