Help for web.auth

Sample CLI

gway web.app setup

References

Full Code

def setup(*,
    app=None,
    project="web.site",
    path=None,
    home: str = None,
    views: str = "view", 
    apis: str = "api",
    static="static",
    shared="shared",
    css="global",           # Default CSS (without .css extension)
    js="global",            # Default JS  (without .js extension)
    auth_required=False,    # Default: Don't enforce --optional security
    engine="bottle",
):
    """
    Setup Bottle web application with symmetrical static/shared public folders.
    Only one project per app. CSS/JS params are used as the only static includes.
    """
    global _ver, _homes, _enabled

    if engine != "bottle":
        raise NotImplementedError("Only Bottle is supported at the moment.")

    _ver = _ver or gw.version()
    bottle.BaseRequest.MEMFILE_MAX = UPLOAD_MB * 1024 * 1024

    if not isinstance(project, str) or not project:
        gw.abort("Project must be a non-empty string.")

    # Track project for later global static collection
    _enabled.add(project)

    # Always use the given project, never a list
    try:
        source = gw[project]
    except Exception:
        gw.abort(f"Project {project} not found in Gateway during app setup.")

    # Default path is the dotted project name, minus any leading web/
    if path is None:
        path = project.replace('.', '/')
        if path.startswith('web/'):
            path = path.removeprefix('web/')
            
    is_new_app = not (app := gw.unwrap_one(app, Bottle) if (oapp := app) else None)
    if is_new_app:
        gw.info("No Bottle app found; creating a new Bottle app.")
        app = Bottle()
        _homes.clear()
        add_home(home, path)

        @app.route("/", method=["GET", "POST"])
        def index():
            response.status = 302
            response.set_header("Location", default_home())
            return ""

        @app.error(404)
        def handle_404(error):
            return gw.web.error.redirect(f"404 Not Found: {request.url}", err=error)
    
    elif home:
        add_home(home, path)

    # Serve shared files (flat mount)
    if shared:
        @app.route(f"/{path}/{shared}/<filepath:path>")
        @app.route(f"/{shared}/<filepath:path>")
        def send_shared(filepath):
            file_path = gw.resource("work", "shared", filepath)
            if os.path.isfile(file_path):
                return static_file(os.path.basename(file_path), root=os.path.dirname(file_path))
            return HTTPResponse(status=404, body="shared file not found")

    # Serve static files (flat mount)
    if static:
        @app.route(f"/{path}/{static}/<filepath:path>")
        @app.route(f"/{static}/<filepath:path>")
        def send_static(filepath):
            file_path = gw.resource("data", "static", filepath)
            if os.path.isfile(file_path):
                return static_file(os.path.basename(file_path), root=os.path.dirname(file_path))
            return HTTPResponse(status=404, body="static file not found")
        
    # Main view dispatcher (only if views is not None)
    if views:
        @app.route(f"/{path}/<view:path>", method=["GET", "POST"])
        def view_dispatch(view):
            nonlocal home, views
            # --- AUTH CHECK ---
            if is_enabled('web.auth') and not gw.web.auth.is_authorized(strict=auth_required):
                return gw.web.error.unauthorized("Unauthorized: You are not permitted to view this page.")
            # Set current endpoint in GWAY context (for helpers/build_url etc)
            gw.context['current_endpoint'] = path
            segments = [s for s in view.strip("/").split("/") if s]
            view_name = segments[0].replace("-", "_") if segments else home
            args = segments[1:] if segments else []
            kwargs = dict(request.query)
            if request.method == "POST":
                try:
                    kwargs.update(request.json or dict(request.forms))
                except Exception as e:
                    return gw.web.error.redirect("Error loading JSON payload", error=e)

            target_func_name = f"{views}_{view_name}" if views else view_name

            view_func = getattr(source, target_func_name, None)
            if not callable(view_func):
                return gw.web.error.redirect(f"View not found: {target_func_name} in {project}")

            try:
                content = view_func(*args, **kwargs)
                if isinstance(content, HTTPResponse):
                    return content
                elif isinstance(content, bytes):
                    response.content_type = "application/octet-stream"
                    response.body = content
                    return response
                elif content is None:
                    return ""
                elif not isinstance(content, str):
                    content = gw.to_html(content)
            except HTTPResponse as res:
                return res
            except Exception as e:
                return gw.web.error.redirect("Broken view", err=e)

            media_origin = "/shared" if shared else ("static" if static else "")
            return render_template(
                title="GWAY - " + view_func.__name__.replace("_", " ").title(),
                content=content,
                css_files=(f"{media_origin}/{css}.css",),
                js_files=(f"{media_origin}/{js}.js",),
            )

    # API dispatcher (only if apis is not None)
    if apis:
        @app.route(f"/api/{path}/<view:path>", method=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
        def api_dispatch(view):
            nonlocal home, apis
            # --- AUTH CHECK ---
            if is_enabled('web.auth') and not gw.web.auth.is_authorized(strict=auth_required):
                return gw.web.error.unauthorized("Unauthorized: API access denied.")
            # Set current endpoint in GWAY context (for helpers/build_url etc)
            gw.context['current_endpoint'] = path
            segments = [s for s in view.strip("/").split("/") if s]
            view_name = segments[0].replace("-", "_") if segments else home
            args = segments[1:] if segments else []
            kwargs = dict(request.query)
            if request.method == "POST":
                try:
                    kwargs.update(request.json or dict(request.forms))
                except Exception as e:
                    return gw.web.error.redirect("Error loading JSON payload", err=e)

            method = request.method.lower()
            specific_af = f"{apis}_{method}_{view_name}"
            generic_af = f"{apis}_{view_name}"

            api_func = getattr(source, specific_af, None)
            if not callable(api_func):
                api_func = getattr(source, generic_af, None)
            if not callable(api_func):
                return gw.web.error.redirect(f"API not found: {specific_af} or {generic_af} in {project}")

            try:
                result = api_func(*args, **kwargs)
                if isinstance(result, HTTPResponse):
                    return result
                response.content_type = "application/json"
                return gw.to_json(result)
            except HTTPResponse as res:
                return res
            except Exception as e:
                return gw.web.error.redirect("Broken API", err=e)

    @app.route("/favicon.ico")
    def favicon():
        proj_parts = project.split('.')
        candidate = gw.resource("data", "static", *proj_parts, "favicon.ico")
        if os.path.isfile(candidate):
            return static_file("favicon.ico", root=os.path.dirname(candidate))
        global_favicon = gw.resource("data", "static", "favicon.ico")
        if os.path.isfile(global_favicon):
            return static_file("favicon.ico", root=os.path.dirname(global_favicon))
        return HTTPResponse(status=404, body="favicon.ico not found")

    if gw.verbose:
        gw.debug(f"Registered homes: {_homes}")
        debug_routes(app)

    return oapp if oapp else app

Sample CLI

gway web.auth clear

Full Code

def clear():
    """
    Clear all registered auth challenges (for testing or reset).
    """
    _challenges.clear()

Sample CLI

gway web.auth config-basic

References

Full Code

def config_basic(*, allow='work/basic_auth.cdv', engine="auto", optional=False):
    """
    Register a basic authentication challenge using username/password pairs from a CDV.
    Username is the key, password is the value under 'b64' (base64-encoded).
    - If optional=True, failure does not block unless strict=True.
    """
    required = not optional
    challenge_fn = _basic_auth_challenge(allow, engine)
    _challenges.append(Challenge(challenge_fn, required=required, name="basic_auth"))
    typ = "REQUIRED" if required else "OPTIONAL"
    gw.info(f"[auth] Registered {typ} basic auth challenge: allow='{allow}' engine='{engine}'")

Sample CLI

gway web.auth create-user

References

Full Code

def create_user(username, password, *, allow='work/basic_auth.cdv', overwrite=False, **fields):
    """
    Create (or update if overwrite=True) a user in the CDV file for basic auth.
    Stores password as b64 field (base64 encoded).
    You can pass extra fields as kwargs.
    """
    if not username or not password:
        raise ValueError("Both username and password are required")
    # Check existence if not overwriting
    if not overwrite:
        users = gw.cdv.load_all(allow)
        if username in users:
            raise ValueError(f"User '{username}' already exists in '{allow}' (set overwrite=True to update)")
    pw_b64 = base64.b64encode(password.encode("utf-8")).decode("ascii")
    user_fields = {"b64": pw_b64}
    user_fields.update(fields)
    gw.cdv.update(allow, username, **user_fields)
    gw.info(f"[auth] Created/updated user '{username}' in '{allow}'")

Sample CLI

gway web.auth is-authorized

Full Code

def is_authorized(*, strict=False):
    """
    Runs all configured auth challenges in order.
    Returns True only if all required (or all, if strict=True) challenges succeed.
    - If strict=True: ALL challenges (required/optional) must succeed.
    - If strict=False: only required challenges must succeed; optional failures logged.
    """
    if not _challenges:
        return True  # No challenge configured: allow all
    for challenge in _challenges:
        if not challenge.check(strict=strict):
            return False
    return True

Sample CLI

gway web.auth is-enabled

Full Code

def is_enabled():
    """
    Returns True if any auth challenge is registered.
    """
    return bool(_challenges)