Compare commits

..

No commits in common. "5aff71ec4cab05362b4d275f771ffb01aa603a4e" and "0546b44507d8ed228b8423b91c9dc251aed69bb7" have entirely different histories.

7 changed files with 341 additions and 364 deletions

View file

@ -46,8 +46,6 @@ jobs:
needs: test needs: test
if: github.ref == 'refs/heads/main' && github.event_name == 'push' if: github.ref == 'refs/heads/main' && github.event_name == 'push'
runs-on: docker runs-on: docker
container:
image: node:20-slim
outputs: outputs:
new_tag: ${{ steps.version.outputs.new_tag }} new_tag: ${{ steps.version.outputs.new_tag }}
new_version: ${{ steps.version.outputs.new_version }} new_version: ${{ steps.version.outputs.new_version }}

2
.gitignore vendored
View file

@ -4,5 +4,3 @@ __pycache__/
.env .env
.nicegui/ .nicegui/
logs/ logs/
.idea/
.coverage

67
TODO.md
View file

@ -188,68 +188,9 @@ Source: `/home/stefanob/PycharmProjects/personal/wirezone`
- [x] Loguru configured (wiregui/logging.py), no print statements - [x] Loguru configured (wiregui/logging.py), no print statements
- [x] File logging to `logs/` when `WG_LOG_TO_FILE=true` - [x] File logging to `logs/` when `WG_LOG_TO_FILE=true`
### Deployment ✅ ### Deployment
- [x] Dockerfile (multi-stage python:3.13-slim) - [ ] Dockerfile (multi-stage)
- [x] compose.prod.yml (bridge networking, NET_ADMIN, nftables) - [ ] compose.prod.yml (app + postgres + valkey + caddy)
- [x] Health endpoint `GET /api/health` - [ ] Health endpoint `GET /api/health`
- [x] Forgejo CI: test → semver → Docker registry push
- [ ] First-run CLI setup command - [ ] First-run CLI setup command
- [ ] README.md - [ ] README.md
---
## UI Polish — Account Page (`/account`)
Redesign from tabbed layout to single scrollable page (matching original wirezone pattern).
Leverage Quasar components + Tailwind utility classes for modern look.
### Layout change
- [ ] Remove tabs — render all sections stacked vertically on one page
- [ ] Page header: "Account Settings" with subtitle description
### Section 1: Account Details
- [ ] Quasar `q-card` with clean table layout (not grid) for user info
- [ ] Rows: Email, Role, Last Sign-in, Method, Created
- [ ] Tailwind: rounded borders, hover states on rows, subtle dividers
- [ ] "Edit" button to open email change dialog (future)
### Section 2: Change Password
- [ ] Separate `q-card` below details
- [ ] Outlined inputs with proper validation feedback
- [ ] Min 8 chars, confirmation match check shown inline
- [ ] Success/error toast notifications
### Section 3: Connected SSO Providers
- [ ] `q-card` showing OIDC connections as a proper table
- [ ] Columns: Provider, Last Refreshed, Status
- [ ] "Disconnect" action per provider (future)
- [ ] Empty state: "No SSO providers connected"
### Section 4: Multi-Factor Authentication
- [ ] `q-card` with MFA methods table
- [ ] Columns: Name, Type, Last Used, Actions (delete)
- [ ] Styled delete button (red outline, confirmation dialog)
- [ ] "Add TOTP Method" and "Add Security Key" buttons below table
- [ ] TOTP registration renders inline (QR + verify code) inside an expansion panel
- [ ] Empty state with icon + message
### Section 5: API Tokens
- [ ] `q-card` with tokens table
- [ ] Columns: Created, Expires, Status (chip: green "Active" / red "Expired"), Actions
- [ ] Quasar `q-chip` for status badges
- [ ] Create token: inline row with expiry input + button (not a dialog)
- [ ] Token display after creation: `q-banner` with copy-to-clipboard button
- [ ] Empty state message
### Section 6: Danger Zone
- [ ] `q-card` with red left border accent (`border-l-4 border-red-500`)
- [ ] "Delete Your Account" button with `q-btn color=negative outline`
- [ ] Confirmation dialog with typed email verification
- [ ] Disabled if user is the only admin
### General styling improvements
- [ ] Consistent card spacing (`q-mt-lg` between sections)
- [ ] Section titles: `text-h6 text-weight-medium`
- [ ] Descriptive subtitles below each section title in `text-caption text-grey-7`
- [ ] Responsive: max-width container centered (`max-w-3xl mx-auto`)
- [ ] Smooth scroll between sections

View file

@ -39,8 +39,6 @@ services:
postgres: postgres:
image: postgres:17 image: postgres:17
restart: unless-stopped restart: unless-stopped
ports:
- "5432:5432"
environment: environment:
POSTGRES_USER: wiregui POSTGRES_USER: wiregui
POSTGRES_PASSWORD: wiregui POSTGRES_PASSWORD: wiregui

View file

@ -29,7 +29,6 @@ def _make_device(**kwargs) -> Device:
async def test_on_device_created_calls_add_peer(mock_wg, mock_fw, mock_settings): async def test_on_device_created_calls_add_peer(mock_wg, mock_fw, mock_settings):
mock_settings.return_value.wg_enabled = True mock_settings.return_value.wg_enabled = True
mock_wg.add_peer = AsyncMock() mock_wg.add_peer = AsyncMock()
mock_fw.add_user_chain = AsyncMock()
mock_fw.add_device_jump_rule = AsyncMock() mock_fw.add_device_jump_rule = AsyncMock()
device = _make_device() device = _make_device()

View file

@ -1,12 +1,12 @@
"""User account page — compact, single-page layout matching Firezone's density.""" """User account page — password change, MFA management, API tokens."""
import json
from datetime import timedelta
from uuid import UUID from uuid import UUID
from loguru import logger from loguru import logger
from nicegui import app, ui from nicegui import app, ui
from sqlmodel import func, select from sqlmodel import select
import json
from wiregui.auth.api_token import generate_api_token from wiregui.auth.api_token import generate_api_token
from wiregui.auth.mfa import generate_totp_qr_svg, generate_totp_secret, get_totp_uri, verify_totp_code from wiregui.auth.mfa import generate_totp_qr_svg, generate_totp_secret, get_totp_uri, verify_totp_code
@ -14,37 +14,13 @@ from wiregui.auth.passwords import hash_password, verify_password
from wiregui.auth.webauthn import create_registration_options, verify_registration from wiregui.auth.webauthn import create_registration_options, verify_registration
from wiregui.db import async_session from wiregui.db import async_session
from wiregui.models.api_token import ApiToken from wiregui.models.api_token import ApiToken
from wiregui.models.device import Device
from wiregui.models.mfa_method import MFAMethod from wiregui.models.mfa_method import MFAMethod
from wiregui.models.oidc_connection import OIDCConnection from wiregui.models.oidc_connection import OIDCConnection
from wiregui.models.rule import Rule
from wiregui.models.user import User from wiregui.models.user import User
from wiregui.pages.layout import layout from wiregui.pages.layout import layout
from wiregui.utils.time import utcnow from wiregui.utils.time import utcnow
def _section_header(title: str, description: str = ""):
"""Compact section header — bold title + optional subtitle."""
ui.label(title).classes("text-lg text-weight-bold q-mt-lg q-mb-none")
if description:
ui.label(description).classes("text-caption text-grey-7 q-mb-sm")
def _kv_row(label: str, value):
"""Compact key-value row."""
with ui.row().classes("w-full items-baseline gap-4"):
ui.label(label).classes("text-sm text-grey-8 w-36 shrink-0")
if isinstance(value, str):
ui.label(value).classes("text-sm")
# Consistent button style: proper padding, readable size, no cramped text
BTN = "unelevated padding=8px 20px"
BTN_PRIMARY = f"color=primary {BTN}"
BTN_OUTLINE = f"outline color=primary {BTN}"
BTN_DANGER = f"color=negative {BTN}"
@ui.page("/account") @ui.page("/account")
async def account_page(): async def account_page():
if not app.storage.user.get("authenticated"): if not app.storage.user.get("authenticated"):
@ -55,291 +31,358 @@ async def account_page():
async with async_session() as session: async with async_session() as session:
user = await session.get(User, user_id) user = await session.get(User, user_id)
device_count = (await session.execute(
select(func.count()).select_from(Device).where(Device.user_id == user_id)
)).scalar()
rule_count = (await session.execute(
select(func.count()).select_from(Rule).where(Rule.user_id == user_id)
)).scalar()
oidc_conns = (await session.execute(
select(OIDCConnection).where(OIDCConnection.user_id == user_id)
)).scalars().all()
with ui.column().classes("w-full p-6"): with ui.column().classes("w-full p-4"):
ui.label("Account Settings").classes("text-sm text-weight-bold") ui.label("Account Settings").classes("text-h5 q-mb-md")
ui.label("Configure settings related to your WireGUI account.").classes("text-caption text-grey-7")
# ===== Details ===== with ui.tabs().classes("w-full") as tabs:
_section_header("Details") profile_tab = ui.tab("Profile")
with ui.column().classes("gap-1"): mfa_tab = ui.tab("Two-Factor Auth")
_kv_row("Email", user.email) tokens_tab = ui.tab("API Tokens")
_kv_row("Role", user.role)
_kv_row("Last Signed In", str(user.last_signed_in_at)[:19] if user.last_signed_in_at else "-")
_kv_row("Created", str(user.inserted_at)[:19])
_kv_row("Number of Devices", str(device_count))
_kv_row("Number of Rules", str(rule_count))
ui.button("Change Email or Password", icon="edit", on_click=lambda: pw_dialog.open()).props( with ui.tab_panels(tabs, value=profile_tab).classes("w-full"):
BTN_PRIMARY
).classes("q-mt-sm")
# ===== SSO ===== # === Profile ===
if oidc_conns: with ui.tab_panel(profile_tab):
_section_header("Connected SSO Providers") with ui.card().classes("w-full"):
cols = [ ui.label("Account Details").classes("text-subtitle1 text-bold")
{"name": "provider", "label": "Provider", "field": "provider", "align": "left"}, ui.separator()
{"name": "refreshed", "label": "Last Refreshed", "field": "refreshed", "align": "left"}, with ui.grid(columns=2).classes("w-full gap-2 q-pa-sm"):
] ui.label("Email:").classes("text-bold")
rows = [{"provider": c.provider, "refreshed": str(c.refreshed_at)[:19] if c.refreshed_at else "Never"} for c in oidc_conns] ui.label(user.email)
ui.table(columns=cols, rows=rows, row_key="provider").props("dense flat bordered").classes("w-full text-xs") ui.label("Role:").classes("text-bold")
ui.label(user.role)
ui.label("Last Sign-in:").classes("text-bold")
ui.label(str(user.last_signed_in_at)[:19] if user.last_signed_in_at else "-")
ui.label("Method:").classes("text-bold")
ui.label(user.last_signed_in_method or "-")
# ===== API Tokens ===== with ui.card().classes("w-full q-mt-md"):
_section_header("API Tokens", "Manage API tokens.") ui.label("Change Password").classes("text-subtitle1 text-bold")
ui.separator()
tokens_container = ui.column().classes("w-full") current_pw = ui.input("Current Password", password=True, password_toggle_button=True).props("outlined dense").classes("w-full")
token_banner = ui.column().classes("w-full") new_pw = ui.input("New Password", password=True, password_toggle_button=True).props("outlined dense").classes("w-full")
confirm_pw = ui.input("Confirm New Password", password=True, password_toggle_button=True).props("outlined dense").classes("w-full")
async def refresh_tokens(): async def change_password():
async with async_session() as session: if not current_pw.value or not new_pw.value:
tokens = (await session.execute( ui.notify("Fill in all password fields", type="negative")
select(ApiToken).where(ApiToken.user_id == user_id).order_by(ApiToken.inserted_at.desc()) return
)).scalars().all() if new_pw.value != confirm_pw.value:
tokens_container.clear() ui.notify("New passwords do not match", type="negative")
with tokens_container: return
if tokens: if len(new_pw.value) < 8:
cols = [ ui.notify("Password must be at least 8 characters", type="negative")
{"name": "created", "label": "Created", "field": "created", "align": "left"}, return
{"name": "expires", "label": "Expires", "field": "expires", "align": "left"},
{"name": "status", "label": "Status", "field": "status", "align": "left"},
{"name": "actions", "label": "", "field": "id", "align": "center"},
]
rows = [{
"id": str(t.id),
"created": str(t.inserted_at)[:19],
"expires": str(t.expires_at)[:19] if t.expires_at else "Never",
"status": "Expired" if t.expires_at and t.expires_at < utcnow() else "Active",
} for t in tokens]
tbl = ui.table(columns=cols, rows=rows, row_key="id").props("dense flat bordered").classes("w-full text-xs")
tbl.add_slot("body-cell-status", r'''<q-td :props="props"><q-badge :color="props.row.status === 'Active' ? 'positive' : 'negative'" :label="props.row.status" /></q-td>''')
tbl.add_slot("body-cell-actions", r'''<q-td :props="props"><q-btn flat dense icon="delete" color="negative" size="xs" @click.stop="() => $parent.$emit('delete', props.row.id)" /></q-td>''')
tbl.on("delete", lambda e: delete_token(e.args))
else:
ui.label("No API tokens.").classes("text-sm text-grey-7")
async def create_token(): async with async_session() as session:
days = int(token_days.value) if token_days.value else 30 u = await session.get(User, user_id)
plaintext, token_hash = generate_api_token() if not verify_password(current_pw.value, u.password_hash):
expires_at = utcnow() + timedelta(days=days) if days > 0 else None ui.notify("Current password is incorrect", type="negative")
async with async_session() as session: return
session.add(ApiToken(token_hash=token_hash, expires_at=expires_at, user_id=user_id)) u.password_hash = hash_password(new_pw.value)
session.add(u)
await session.commit()
logger.info("Password changed for {}", user.email)
ui.notify("Password changed", type="positive")
current_pw.value = ""
new_pw.value = ""
confirm_pw.value = ""
ui.button("Change Password", on_click=change_password).props("color=primary").classes("q-mt-sm")
# OIDC connections
async with async_session() as session:
oidc_conns = (await session.execute(
select(OIDCConnection).where(OIDCConnection.user_id == user_id)
)).scalars().all()
if oidc_conns:
with ui.card().classes("w-full q-mt-md"):
ui.label("Connected SSO Providers").classes("text-subtitle1 text-bold")
ui.separator()
for conn in oidc_conns:
with ui.row().classes("w-full items-center justify-between q-pa-xs"):
ui.label(f"{conn.provider}").classes("text-bold")
ui.label(f"Last refreshed: {str(conn.refreshed_at)[:19] if conn.refreshed_at else 'Never'}")
# === MFA ===
with ui.tab_panel(mfa_tab):
await _render_mfa_panel(user_id, user.email)
# === API Tokens ===
with ui.tab_panel(tokens_tab):
await _render_tokens_panel(user_id)
async def _render_mfa_panel(user_id: UUID, email: str):
"""Render the MFA management tab."""
async def load_methods():
async with async_session() as session:
result = await session.execute(
select(MFAMethod).where(MFAMethod.user_id == user_id).order_by(MFAMethod.inserted_at)
)
return result.scalars().all()
async def refresh_methods():
methods = await load_methods()
methods_container.clear()
with methods_container:
if methods:
for m in methods:
with ui.row().classes("w-full items-center justify-between q-pa-xs"):
with ui.row().classes("items-center gap-2"):
ui.icon("security").props("color=primary")
ui.label(m.name).classes("text-bold")
ui.label(f"({m.type})").classes("text-caption text-grey-7")
with ui.row().classes("items-center gap-2"):
ui.label(f"Last used: {str(m.last_used_at)[:19] if m.last_used_at else 'Never'}").classes("text-caption")
ui.button(icon="delete", on_click=lambda mid=m.id: delete_method(mid)).props("flat dense color=negative")
ui.separator()
else:
ui.label("No MFA methods configured.").classes("text-caption text-grey-7 q-pa-sm")
async def delete_method(method_id):
async with async_session() as session:
m = await session.get(MFAMethod, method_id)
if m and m.user_id == user_id:
await session.delete(m)
await session.commit() await session.commit()
logger.info("API token created (expires in {} days)", days) logger.info("MFA method deleted for user {}", email)
token_banner.clear() ui.notify("MFA method removed")
with token_banner: await refresh_methods()
with ui.row().classes("w-full items-center bg-green-1 rounded px-3 py-1.5 gap-2 q-mb-xs text-xs"):
ui.icon("check_circle", color="positive").props("size=xs")
ui.label("Copy now — won't be shown again.").classes("text-weight-medium")
with ui.row().classes("w-full items-center gap-1 q-mb-sm"):
ui.input(value=plaintext).props("readonly outlined dense").classes("w-full font-mono").style("font-size: 0.75rem")
ui.button(icon="content_copy", on_click=lambda: _copy(plaintext)).props("flat dense size=sm")
await refresh_tokens()
async def _copy(text): # Registration state
await ui.run_javascript(f"navigator.clipboard.writeText('{text}')") registration = {"secret": None}
ui.notify("Copied", type="positive")
async def delete_token(token_id): def start_registration():
async with async_session() as session: secret = generate_totp_secret()
t = await session.get(ApiToken, UUID(token_id)) registration["secret"] = secret
if t and t.user_id == user_id: uri = get_totp_uri(secret, email)
await session.delete(t) svg = generate_totp_qr_svg(uri)
reg_container.clear()
with reg_container:
ui.label("Scan this QR code with your authenticator app:").classes("text-body2")
ui.html(svg).classes("w-64 q-my-sm")
ui.label(f"Or enter this secret manually: {secret}").classes("text-caption font-mono")
reg_name_input = ui.input("Method Name", value="Authenticator").props("outlined dense").classes("w-full")
reg_code_input = ui.input("Verification Code", placeholder="Enter 6-digit code").props("outlined dense maxlength=6").classes("w-full")
async def verify_and_save():
code = reg_code_input.value.strip()
name = reg_name_input.value.strip() or "Authenticator"
if not verify_totp_code(registration["secret"], code):
ui.notify("Invalid code — check your authenticator", type="negative")
return
async with async_session() as session:
method = MFAMethod(
name=name,
type="totp",
payload={"secret": registration["secret"]},
user_id=user_id,
)
session.add(method)
await session.commit() await session.commit()
ui.notify("Token deleted")
await refresh_tokens()
await refresh_tokens() logger.info("MFA TOTP registered for {}", email)
with ui.row().classes("items-center gap-3 q-mt-sm"): ui.notify("MFA method added!", type="positive")
token_days = ui.input("Expires in days", value="30").props("outlined dense").classes("w-36") registration["secret"] = None
ui.button("+ Add API Token", on_click=create_token).props(BTN_OUTLINE) reg_container.clear()
await refresh_methods()
# ===== MFA ===== ui.button("Verify & Save", on_click=verify_and_save).props("color=primary").classes("q-mt-sm")
_section_header("Multi Factor Authentication", "Your MFA methods are invoked when login with username and password.") ui.button("Cancel", on_click=lambda: reg_container.clear()).props("flat")
with ui.card().classes("w-full"):
ui.label("Two-Factor Authentication Methods").classes("text-subtitle1 text-bold")
ui.separator()
methods_container = ui.column().classes("w-full") methods_container = ui.column().classes("w-full")
reg_container = ui.column().classes("w-full")
registration = {"secret": None}
webauthn_state = {"challenge": None}
async def refresh_methods():
async with async_session() as session:
methods = (await session.execute(
select(MFAMethod).where(MFAMethod.user_id == user_id).order_by(MFAMethod.inserted_at)
)).scalars().all()
methods_container.clear()
with methods_container:
if methods:
cols = [
{"name": "name", "label": "Name", "field": "name", "align": "left"},
{"name": "type", "label": "Type", "field": "type", "align": "left"},
{"name": "last_used", "label": "Last Used", "field": "last_used", "align": "left"},
{"name": "actions", "label": "", "field": "id", "align": "center"},
]
rows = [{"id": str(m.id), "name": m.name, "type": m.type.upper(), "last_used": str(m.last_used_at)[:19] if m.last_used_at else "Never"} for m in methods]
tbl = ui.table(columns=cols, rows=rows, row_key="id").props("dense flat bordered").classes("w-full text-xs")
tbl.add_slot("body-cell-actions", r'''<q-td :props="props"><q-btn flat dense label="Delete" color="warning" size="xs" @click.stop="() => $parent.$emit('delete', props.row.id)" /></q-td>''')
tbl.on("delete", lambda e: _confirm_del_mfa(e.args))
else:
ui.label("No MFA methods added.").classes("text-sm text-grey-7")
async def _confirm_del_mfa(mid):
with ui.dialog(value=True) as dlg:
with ui.card().classes("w-72"):
ui.label("Remove MFA method?").classes("text-subtitle2")
with ui.row().classes("w-full justify-end q-mt-sm gap-2"):
ui.button("Cancel", on_click=dlg.close).props("flat dense size=sm")
ui.button("Remove", on_click=lambda: _del_mfa(mid, dlg)).props("color=negative dense size=sm unelevated")
async def _del_mfa(mid, dlg):
async with async_session() as session:
m = await session.get(MFAMethod, UUID(mid))
if m and m.user_id == user_id:
await session.delete(m)
await session.commit()
dlg.close()
ui.notify("Removed")
await refresh_methods()
def start_totp():
secret = generate_totp_secret()
registration["secret"] = secret
svg = generate_totp_qr_svg(get_totp_uri(secret, user.email))
reg_container.clear()
with reg_container:
ui.separator().classes("q-my-sm")
with ui.row().classes("items-start gap-4"):
ui.html(svg).style("width: 140px; height: 140px")
with ui.column().classes("gap-2"):
ui.label("Scan or enter manually:").classes("text-xs")
ui.label(secret).classes("text-xs font-mono bg-grey-2 px-2 py-1 rounded")
reg_name = ui.input("Name", value="Authenticator").props("outlined dense").classes("w-52").style("font-size: 0.8rem")
reg_code = ui.input("6-digit code").props("outlined dense maxlength=6").classes("w-52").style("font-size: 0.8rem")
async def verify():
if not verify_totp_code(registration["secret"], reg_code.value.strip()):
ui.notify("Invalid code", type="negative")
return
async with async_session() as session:
session.add(MFAMethod(name=reg_name.value.strip() or "Authenticator", type="totp", payload={"secret": registration["secret"]}, user_id=user_id))
await session.commit()
ui.notify("TOTP added!", type="positive")
reg_container.clear()
await refresh_methods()
with ui.row().classes("gap-2"):
ui.button("Verify & Save", on_click=verify).props(BTN_PRIMARY)
ui.button("Cancel", on_click=lambda: reg_container.clear()).props("flat dense size=sm")
async def start_webauthn():
existing = []
async with async_session() as session:
existing = [m.payload for m in (await session.execute(
select(MFAMethod).where(MFAMethod.user_id == user_id, MFAMethod.type.in_(["native", "portable"]))
)).scalars().all()]
try:
reg_data = create_registration_options(user_id, user.email, existing)
except Exception as e:
ui.notify(f"WebAuthn unavailable: {e}", type="negative")
return
webauthn_state["challenge"] = reg_data["challenge"]
js = f"""(async()=>{{try{{const o=JSON.parse('{reg_data["options_json"]}');o.challenge=Uint8Array.from(atob(o.challenge.replace(/-/g,'+').replace(/_/g,'/')),c=>c.charCodeAt(0));o.user.id=Uint8Array.from(atob(o.user.id.replace(/-/g,'+').replace(/_/g,'/')),c=>c.charCodeAt(0));if(o.excludeCredentials)o.excludeCredentials=o.excludeCredentials.map(c=>({{...c,id:Uint8Array.from(atob(c.id.replace(/-/g,'+').replace(/_/g,'/')),h=>h.charCodeAt(0))}}));const cr=await navigator.credentials.create({{publicKey:o}});return JSON.stringify({{id:cr.id,rawId:btoa(String.fromCharCode(...new Uint8Array(cr.rawId))).replace(/\\+/g,'-').replace(/\\//g,'_').replace(/=/g,''),type:cr.type,response:{{attestationObject:btoa(String.fromCharCode(...new Uint8Array(cr.response.attestationObject))).replace(/\\+/g,'-').replace(/\\//g,'_').replace(/=/g,''),clientDataJSON:btoa(String.fromCharCode(...new Uint8Array(cr.response.clientDataJSON))).replace(/\\+/g,'-').replace(/\\//g,'_').replace(/=/g,'')}}}})}}catch(e){{return JSON.stringify({{error:e.message}})}}}})()\n"""
result = await ui.run_javascript(js)
try:
data = json.loads(result)
except Exception:
ui.notify("WebAuthn error", type="negative")
return
if "error" in data:
ui.notify(f"WebAuthn: {data['error']}", type="negative")
return
try:
cred = verify_registration(result, webauthn_state["challenge"])
except Exception as e:
ui.notify(f"Failed: {e}", type="negative")
return
async with async_session() as session:
session.add(MFAMethod(name="Security Key", type="portable", payload=cred, user_id=user_id))
await session.commit()
ui.notify("Security key registered!", type="positive")
await refresh_methods()
await refresh_methods() await refresh_methods()
with ui.row().classes("gap-2 q-mt-xs"):
ui.button("+ Add MFA Method", on_click=start_totp).props(BTN_OUTLINE)
ui.button("+ Add Security Key", on_click=lambda: start_webauthn()).props(BTN_OUTLINE)
# ===== Danger Zone ===== with ui.row().classes("q-mt-sm gap-2"):
_section_header("Danger Zone") ui.button("Add TOTP Method", icon="add", on_click=start_registration).props("outline")
ui.button("Add Security Key", icon="key", on_click=lambda: start_webauthn_registration()).props("outline")
reg_container = ui.column().classes("w-full q-mt-md")
webauthn_state = {"challenge": None}
async def start_webauthn_registration():
# Get existing webauthn credentials to exclude
existing = []
async with async_session() as session: async with async_session() as session:
admin_count = (await session.execute(select(func.count()).select_from(User).where(User.role == "admin"))).scalar() from sqlmodel import select as sel
is_only_admin = user.role == "admin" and admin_count <= 1 result = await session.execute(
sel(MFAMethod).where(MFAMethod.user_id == user_id, MFAMethod.type.in_(["native", "portable"]))
)
for m in result.scalars().all():
existing.append(m.payload)
async def confirm_delete(): try:
with ui.dialog(value=True) as dlg: reg_data = create_registration_options(user_id, email, existing)
with ui.card().classes("w-80"): except Exception as e:
ui.label("Are you sure?").classes("text-subtitle2 text-negative") ui.notify(f"WebAuthn not available: {e}", type="negative")
ui.label(f"Type {user.email} to confirm:").classes("text-xs q-my-xs") return
ci = ui.input().props("outlined dense").classes("w-full").style("font-size: 0.8rem")
async def do_del():
if ci.value.strip() != user.email:
ui.notify("Email doesn't match", type="negative")
return
async with async_session() as session:
for model in (Device, Rule, MFAMethod, ApiToken, OIDCConnection):
for item in (await session.execute(select(model).where(model.user_id == user_id))).scalars().all():
await session.delete(item)
u = await session.get(User, user_id)
if u:
await session.delete(u)
await session.commit()
dlg.close()
app.storage.user.clear()
ui.navigate.to("/login")
with ui.row().classes("w-full justify-end q-mt-sm gap-2"):
ui.button("Cancel", on_click=dlg.close).props("flat dense size=sm")
ui.button("Delete", on_click=do_del).props("color=negative dense size=sm unelevated")
ui.button("Delete Your Account", icon="delete", on_click=confirm_delete).props( webauthn_state["challenge"] = reg_data["challenge"]
BTN_DANGER + (" disable" if is_only_admin else "") options_json = reg_data["options_json"]
# Call browser's navigator.credentials.create() via JavaScript
js = f"""
async function() {{
try {{
const options = JSON.parse('{options_json}');
// Convert base64url strings to ArrayBuffers
options.challenge = Uint8Array.from(atob(options.challenge.replace(/-/g,'+').replace(/_/g,'/')), c => c.charCodeAt(0));
options.user.id = Uint8Array.from(atob(options.user.id.replace(/-/g,'+').replace(/_/g,'/')), c => c.charCodeAt(0));
if (options.excludeCredentials) {{
options.excludeCredentials = options.excludeCredentials.map(c => ({{
...c,
id: Uint8Array.from(atob(c.id.replace(/-/g,'+').replace(/_/g,'/')), ch => ch.charCodeAt(0))
}}));
}}
const credential = await navigator.credentials.create({{publicKey: options}});
// Serialize the response
const response = {{
id: credential.id,
rawId: btoa(String.fromCharCode(...new Uint8Array(credential.rawId))).replace(/\\+/g,'-').replace(/\\//g,'_').replace(/=/g,''),
type: credential.type,
response: {{
attestationObject: btoa(String.fromCharCode(...new Uint8Array(credential.response.attestationObject))).replace(/\\+/g,'-').replace(/\\//g,'_').replace(/=/g,''),
clientDataJSON: btoa(String.fromCharCode(...new Uint8Array(credential.response.clientDataJSON))).replace(/\\+/g,'-').replace(/\\//g,'_').replace(/=/g,''),
}},
}};
return JSON.stringify(response);
}} catch(e) {{
return JSON.stringify({{"error": e.message}});
}}
}}
"""
result = await ui.run_javascript(f"({js})()")
await _handle_webauthn_response(result)
async def _handle_webauthn_response(result_json: str):
try:
result = json.loads(result_json)
except (json.JSONDecodeError, TypeError):
ui.notify("WebAuthn response error", type="negative")
return
if "error" in result:
ui.notify(f"WebAuthn failed: {result['error']}", type="negative")
return
challenge = webauthn_state.get("challenge")
if not challenge:
ui.notify("No pending WebAuthn challenge", type="negative")
return
try:
credential_data = verify_registration(result_json, challenge)
except Exception as e:
ui.notify(f"Verification failed: {e}", type="negative")
return
async with async_session() as session:
method = MFAMethod(
name="Security Key",
type="portable",
payload=credential_data,
user_id=user_id,
)
session.add(method)
await session.commit()
logger.info("WebAuthn key registered for {}", email)
ui.notify("Security key registered!", type="positive")
webauthn_state["challenge"] = None
await refresh_methods()
async def _render_tokens_panel(user_id: UUID):
"""Render the API tokens tab."""
async def load_tokens():
async with async_session() as session:
result = await session.execute(
select(ApiToken).where(ApiToken.user_id == user_id).order_by(ApiToken.inserted_at.desc())
)
return result.scalars().all()
async def refresh_tokens():
tokens = await load_tokens()
token_table.rows = [
{
"id": str(t.id),
"created": str(t.inserted_at)[:19],
"expires": str(t.expires_at)[:19] if t.expires_at else "Never",
"status": "Expired" if t.expires_at and t.expires_at < utcnow() else "Active",
}
for t in tokens
]
token_table.update()
async def create_token():
from datetime import timedelta
days = int(token_days.value) if token_days.value else 30
plaintext, token_hash = generate_api_token()
expires_at = utcnow() + timedelta(days=days) if days > 0 else None
async with async_session() as session:
token = ApiToken(token_hash=token_hash, expires_at=expires_at, user_id=user_id)
session.add(token)
await session.commit()
logger.info("API token created (expires in {} days)", days)
# Show the token once
with ui.dialog(value=True) as token_dialog:
with ui.card().classes("w-96"):
ui.label("API Token Created").classes("text-h6")
ui.label("Copy this token now — it won't be shown again.").classes("text-caption text-negative")
ui.input(value=plaintext).props("readonly outlined dense").classes("w-full font-mono q-mt-sm")
ui.button("Close", on_click=token_dialog.close).props("flat").classes("w-full q-mt-sm")
await refresh_tokens()
async def delete_token(token_id: str):
async with async_session() as session:
t = await session.get(ApiToken, UUID(token_id))
if t and t.user_id == user_id:
await session.delete(t)
await session.commit()
ui.notify("Token deleted")
await refresh_tokens()
with ui.card().classes("w-full"):
ui.label("API Tokens").classes("text-subtitle1 text-bold")
ui.separator()
ui.label("Use API tokens for programmatic access to the REST API.").classes("text-caption text-grey-7")
token_columns = [
{"name": "created", "label": "Created", "field": "created", "align": "left"},
{"name": "expires", "label": "Expires", "field": "expires", "align": "left"},
{"name": "status", "label": "Status", "field": "status", "align": "left"},
{"name": "actions", "label": "", "field": "id", "align": "center"},
]
token_table = ui.table(columns=token_columns, rows=[], row_key="id").classes("w-full")
token_table.add_slot(
"body-cell-actions",
'''
<q-td :props="props">
<q-btn flat dense icon="delete" color="negative"
@click.stop="() => $parent.$emit('delete', props.row.id)" />
</q-td>
''',
) )
token_table.on("delete", lambda e: delete_token(e.args))
# Password dialog with ui.row().classes("items-center gap-2 q-mt-sm"):
with ui.dialog() as pw_dialog: token_days = ui.input("Expires in (days)", value="30").props("outlined dense").classes("w-40")
with ui.card().classes("w-96"): ui.button("Create Token", icon="add", on_click=create_token).props("color=primary")
ui.label("Change Email or Password").classes("text-subtitle1 text-weight-medium")
ui.separator() await refresh_tokens()
cur = ui.input("Current Password", password=True, password_toggle_button=True).props("outlined dense").classes("w-full")
npw = ui.input("New Password", password=True, password_toggle_button=True).props("outlined dense").classes("w-full")
cpw = ui.input("Confirm Password", password=True, password_toggle_button=True).props("outlined dense").classes("w-full")
async def save_pw():
if not cur.value or not npw.value:
ui.notify("All fields required", type="negative")
return
if npw.value != cpw.value:
ui.notify("Passwords don't match", type="negative")
return
if len(npw.value) < 8:
ui.notify("Min 8 characters", type="negative")
return
async with async_session() as session:
u = await session.get(User, user_id)
if not verify_password(cur.value, u.password_hash):
ui.notify("Wrong current password", type="negative")
return
u.password_hash = hash_password(npw.value)
session.add(u)
await session.commit()
ui.notify("Password changed", type="positive")
pw_dialog.close()
with ui.row().classes("w-full justify-end q-mt-sm"):
ui.button("Cancel", on_click=pw_dialog.close).props("flat")
ui.button("Save", on_click=save_pw).props("color=primary unelevated")

View file

@ -12,7 +12,7 @@ from wiregui.models.connectivity_check import ConnectivityCheck
from wiregui.services import notifications from wiregui.services import notifications
from wiregui.utils.time import utcnow from wiregui.utils.time import utcnow
DEFAULT_URL = "https://one.one.one.one/cdn-cgi/trace" DEFAULT_URL = "https://ping-dev.firezone.dev"
DEFAULT_INTERVAL = 300 # 5 minutes DEFAULT_INTERVAL = 300 # 5 minutes