Help for web.app

Sample CLI

gway vbox render-error

References

Full Code

def render_error(title: str, message: str, *, back_link: bool = True, target: str="uploads") -> str:
    """Helper for error display with optional link back to upload main page."""
    html = f"<h1>{title}</h1><p>{message}</p>"
    if back_link:
        url = gw.web.app.build_url(target)
        html += f'<p class="error"><a href="{url}?">Back to {target} page</a></p>'
    return html

Sample CLI

gway vbox view-uploads

References

Full Code

def view_uploads(*, vbid: str = None, timeout: int = 60, files: int = 4, email: str = None, **kwargs):
    """
    GET: Display upload interface or create a new upload box.
    POST: Handle uploaded files to a specific vbid.
    """
    global _gc_thread_on
    if not _gc_thread_on:
        threading.Thread(target=periodic_purge, daemon=True).start()
        _gc_thread_on = True

    admin_email = os.environ.get("ADMIN_EMAIL")
    gw.info(f"Entry: vbid={vbid!r}, timeout={timeout}, files={files}, email={email!r}, method={request.method}")

    # Handle file upload (POST) with a vbid (the classic file upload case)
    if request.method == 'POST' and vbid:
        gw.info(f"POST file upload for vbid={vbid}")
        with _gc_lock:
            expire = _open_boxes.get(vbid)
            if not expire or expire < time.time():
                gw.warning(f"vbox expired for vbid={vbid}")
                return render_error("Upload Box Expired", "Please regenerate a new vbid.")

        try:
            short, _ = vbid.split(".", 1)
        except ValueError:
            gw.error(f"Invalid vbid format: {vbid}")
            return render_error("Invalid vbid format", "Expected form: <code>short.long</code>.")

        upload_dir = gw.resource(*VBOX_PATH, short)
        os.makedirs(upload_dir, exist_ok=True)

        uploaded_files = request.files.getlist("file")
        results = []
        for f in uploaded_files:
            save_path = os.path.join(upload_dir, f.filename)
            try:
                f.save(save_path)
                results.append(f"Uploaded {f.filename}")
                gw.info(f"Uploaded {f.filename} to {short}")
            except Exception as e:
                results.append(f"Error uploading {f.filename}: {e}")
                gw.error(f"Issue uploading {f.filename} to {short}")
                gw.exception(e)

        download_short_url = gw.web.app.build_url("download", vbid=short)
        download_long_url = gw.web.app.build_url("download", vbid=vbid)
        gw.info(f"Returning upload result UI for vbid={vbid}")
        return (
            "<pre>" + "\n".join(results) + "</pre>" +
            f"<p><a href='?vbid={vbid}'>UPLOAD MORE files to this box</a></p>" +
            f"<p><a href='{download_short_url}'>Go to PUBLIC READ-ONLY download page for this box</a></p>" +
            f"<p><a href='{download_long_url}'>Go to HIDDEN WRITE download page for this box</a></p>"
        )

    if not vbid:
        gw.info(f"No vbid present, always creating/checking box.")
        remote_addr = request.remote_addr or ''
        user_agent = request.headers.get('User-Agent') or ''
        identity = remote_addr + user_agent
        hash_digest = hashlib.sha256(identity.encode()).hexdigest()
        short = hash_digest[:12]
        full_id = f"{short}.{hash_digest[:40]}"

        with _gc_lock:
            now = time.time()
            expires = _open_boxes.get(full_id)
            if not expires or expires < now:
                _open_boxes[full_id] = now + timeout * 60
                os.makedirs(gw.resource(*VBOX_PATH, short), exist_ok=True)
                url = gw.build_url("uploads", vbid=full_id)
                message = f"[UPLOAD] Upload box created (expires in {timeout} min): {url}"
                print(("-" * 70) + '\n' + message + '\n' + ("-" * 70))
                gw.warning(message)
                gw.info(f"Created new box: {full_id}")
            else:
                url = gw.build_url("upload", vbid=full_id)
                gw.info(f"Existing box reused: {full_id}")

        admin_notif = ""
        sent_copy_msg = "<p>A copy of the access URL was sent to the admin.</p>"
        if email:
            if admin_email and email.lower() == admin_email.strip().lower():
                subject = "Upload Box Link"
                body = (
                    f"A new upload box was created.\n\n"
                    f"Access URL: {url}\n\n"
                    f"This box will expire in {timeout} minutes."
                )
                try:
                    gw.mail.send(subject, body=body, to=admin_email)
                    gw.info(f"Sent upload URL email to admin.")
                except Exception as e:
                    gw.error(f"Error sending VBOX notification email: {e}")
                admin_notif = sent_copy_msg
            else:
                admin_notif = sent_copy_msg
                gw.info(f"Pretend email sent: {email!r} != {admin_email!r}")

        # Show the ready box UI + the optional email form
        email_form_html = (
            "<form method='POST'>"
            "<input type='email' name='email' required placeholder='Your email address'>"
            "<button type='submit'>Request Link</button>"
            "</form>"
        )
        form_message = (
            "<p>If you are a site member, you may request a URL to be sent to your email by entering it here.</p>"
        )

        local_console_info = ""
        if gw.web.server.is_local():
            local_console_info = (
                "<p>We've prepared an upload box for you. Check the console for the access URL.</p>"
                "<p>To use it, go to <code>?vbid=…</code> and upload your files there.</p>"
            )

        return (
            "<h1>Upload to Virtual Box</h1>"
            f"{local_console_info}"
            f"{admin_notif}"
            f"{form_message if not email else ''}{email_form_html if not email else ''}"
        )

    # Validate and show upload UI for an existing vbid
    gw.info(f"Render upload UI for vbid={vbid!r}")
    with _gc_lock:
        expire = _open_boxes.get(vbid)
        if not expire or expire < time.time():
            gw.warning(f"vbox expired for vbid={vbid}")
            return render_error("Upload Box Expired or Not Found", "Please regenerate a new vbid.")

    try:
        short, _ = vbid.split(".", 1)
    except ValueError:
        gw.error(f"Invalid vbid format: {vbid}")
        return render_error("Invalid vbid format", "Expected form: <code>short.long</code>.")

    # Generate N file input fields
    file_inputs = "\n".join(
        f'<input type="file" name="file">' for _ in range(max(1, files))
    )

    download_url = gw.build_url("download", vbid=vbid)
    gw.info(f"Displaying upload form for {short}")

    return f"<h1>Upload to Box: {short}</h1>" + f"""
        <form method="POST" enctype="multipart/form-data">
            {file_inputs}
            <br><p><button type="submit">Upload</button><p/>
        </form>
        <p>Files will be stored in <code>{'/'.join(VBOX_PATH)}/{short}/</code></p>
        <p><a href="{download_url}">Go to download page for this box</a></p>
    """

Sample CLI

gway web.app add-home

References

Full Code

def add_home(home, path):
    global _homes
    title = home.replace('-', ' ').replace('_', ' ').title()
    route = f"{path}/{home}"
    if (title, route) not in _homes:
        _homes.append((title, route))
        gw.debug(f"Added home: ({title}, {route})")

Sample CLI

gway web.app build-url

Full Code

def build_url(*args, **kwargs):
    path = "/".join(str(a).strip("/") for a in args if a)
    endpoint = current_endpoint()
    if endpoint:
        url = f"/{endpoint}/{path}" if path else f"/{endpoint}"
    else:
        url = f"/{path}"
    if kwargs:
        url += "?" + urlencode(kwargs)
    return url

Sample CLI

gway web.app current-endpoint

References

Full Code

def current_endpoint():
    """
    Return the canonical endpoint path for the current request (the project route prefix).
    Falls back to gw.context['current_endpoint'], or None.
    """
    return gw.context.get('current_endpoint')

Sample CLI

gway web.app debug-routes

References

Full Code

def debug_routes(app):
    for route in app.routes:
        gw.debug(f"{route.method:6} {route.rule:30} -> {route.callback.__name__}")

Sample CLI

gway web.app default-home

Full Code

def default_home():
    for title, route in _homes:
        if route:
            return "/" + route.lstrip("/")
    return "/site/readme"

Sample CLI

gway web.app enabled-projects

Full Code

def enabled_projects():
    """Return a set of all enabled web projects (for static.collect, etc)."""
    global _enabled
    return set(_enabled)

Sample CLI

gway web.app is-enabled

Full Code

def is_enabled(project_name):
    global _enabled
    return project_name in _enabled

Sample CLI

gway web.app render-template

References

Full Code

def render_template(*, title="GWAY", content="", css_files=None, js_files=None):
    global _ver
    version = _ver = _ver or gw.version()

    # --- MAIN: THEME CSS HANDLING ---
    theme_css = None
    if is_enabled('web.nav'):
        try:
            theme_css = gw.web.nav.get_style()
        except Exception:
            theme_css = "/static/styles/base.css"
    css_files = gw.to_list(css_files)
    if theme_css and theme_css not in css_files:
        idx = 1 if css_files and 'global.css' in css_files[0] else 0
        css_files.insert(idx, theme_css)
    css_links = ""
    if css_files:
        for href in css_files:
            css_links += f'<link rel="stylesheet" href="{href}">\n'

    js_files = gw.to_list(js_files)
    js_links = ""
    if js_files:
        for src in js_files:
            js_links += f'<script src="{src}"></script>\n'

    favicon = f'<link rel="icon" href="/favicon.ico" type="image/x-icon" />'
    credits = f'''
        <p>GWAY is written in <a href="https://www.python.org/">Python 3.11</a>.
        Hosting by <a href="https://www.gelectriic.com/">Gelectriic Solutions</a>, 
        <a href="https://pypi.org">PyPI</a> and <a href="https://github.com/arthexis/gway">Github</a>.</p>
    '''

    nav = ""
    if is_enabled('web.nav'):
        nav = gw.web.nav.render(homes=_homes)

    html = template("""<!DOCTYPE html>
        <html lang="en">
        <head>
            <meta charset="UTF-8" />
            <title>{{!title}}</title>
            {{!css_links}}
            {{!favicon}}
            <meta name="viewport" content="width=device-width, initial-scale=1.0" />
        </head>
        <body>
            <div class="page-wrap">
                <div class="layout">
                    {{!nav}}<main>{{!content}}</main>
                </div>
                <footer><p>This website was <strong>built</strong>, <strong>tested</strong> 
                    and <strong>released</strong> with <a href="https://arthexis.com">GWAY</a> 
                    <a href="https://pypi.org/project/gway/{{!version}}/">v{{!version}}</a>.</p>
                    {{!credits}}
                </footer>
            </div>
            {{!js_links}}
        </body>
        </html>
    """, **locals())
    return html

Sample CLI

gway web.app setup

References

Full Code

def setup(*,
    app=None,
    project="web.site",
    path=None,
    home: str = None,
    views: str = "view", 
    apis: str = "api",
    static="static",
    shared="shared",
    css="global",           # Default CSS (without .css extension)
    js="global",            # Default JS  (without .js extension)
    auth_required=False,    # Default: Don't enforce --optional security
    engine="bottle",
):
    """
    Setup Bottle web application with symmetrical static/shared public folders.
    Only one project per app. CSS/JS params are used as the only static includes.
    """
    global _ver, _homes, _enabled

    if engine != "bottle":
        raise NotImplementedError("Only Bottle is supported at the moment.")

    _ver = _ver or gw.version()
    bottle.BaseRequest.MEMFILE_MAX = UPLOAD_MB * 1024 * 1024

    if not isinstance(project, str) or not project:
        gw.abort("Project must be a non-empty string.")

    # Track project for later global static collection
    _enabled.add(project)

    # Always use the given project, never a list
    try:
        source = gw[project]
    except Exception:
        gw.abort(f"Project {project} not found in Gateway during app setup.")

    # Default path is the dotted project name, minus any leading web/
    if path is None:
        path = project.replace('.', '/')
        if path.startswith('web/'):
            path = path.removeprefix('web/')
            
    is_new_app = not (app := gw.unwrap_one(app, Bottle) if (oapp := app) else None)
    if is_new_app:
        gw.info("No Bottle app found; creating a new Bottle app.")
        app = Bottle()
        _homes.clear()
        add_home(home, path)

        @app.route("/", method=["GET", "POST"])
        def index():
            response.status = 302
            response.set_header("Location", default_home())
            return ""

        @app.error(404)
        def handle_404(error):
            return gw.web.error.redirect(f"404 Not Found: {request.url}", err=error)
    
    elif home:
        add_home(home, path)

    # Serve shared files (flat mount)
    if shared:
        @app.route(f"/{path}/{shared}/<filepath:path>")
        @app.route(f"/{shared}/<filepath:path>")
        def send_shared(filepath):
            file_path = gw.resource("work", "shared", filepath)
            if os.path.isfile(file_path):
                return static_file(os.path.basename(file_path), root=os.path.dirname(file_path))
            return HTTPResponse(status=404, body="shared file not found")

    # Serve static files (flat mount)
    if static:
        @app.route(f"/{path}/{static}/<filepath:path>")
        @app.route(f"/{static}/<filepath:path>")
        def send_static(filepath):
            file_path = gw.resource("data", "static", filepath)
            if os.path.isfile(file_path):
                return static_file(os.path.basename(file_path), root=os.path.dirname(file_path))
            return HTTPResponse(status=404, body="static file not found")
        
    # Main view dispatcher (only if views is not None)
    if views:
        @app.route(f"/{path}/<view:path>", method=["GET", "POST"])
        def view_dispatch(view):
            nonlocal home, views
            # --- AUTH CHECK ---
            if is_enabled('web.auth') and not gw.web.auth.is_authorized(strict=auth_required):
                return gw.web.error.unauthorized("Unauthorized: You are not permitted to view this page.")
            # Set current endpoint in GWAY context (for helpers/build_url etc)
            gw.context['current_endpoint'] = path
            segments = [s for s in view.strip("/").split("/") if s]
            view_name = segments[0].replace("-", "_") if segments else home
            args = segments[1:] if segments else []
            kwargs = dict(request.query)
            if request.method == "POST":
                try:
                    kwargs.update(request.json or dict(request.forms))
                except Exception as e:
                    return gw.web.error.redirect("Error loading JSON payload", error=e)

            target_func_name = f"{views}_{view_name}" if views else view_name

            view_func = getattr(source, target_func_name, None)
            if not callable(view_func):
                return gw.web.error.redirect(f"View not found: {target_func_name} in {project}")

            try:
                content = view_func(*args, **kwargs)
                if isinstance(content, HTTPResponse):
                    return content
                elif isinstance(content, bytes):
                    response.content_type = "application/octet-stream"
                    response.body = content
                    return response
                elif content is None:
                    return ""
                elif not isinstance(content, str):
                    content = gw.to_html(content)
            except HTTPResponse as res:
                return res
            except Exception as e:
                return gw.web.error.redirect("Broken view", err=e)

            media_origin = "/shared" if shared else ("static" if static else "")
            return render_template(
                title="GWAY - " + view_func.__name__.replace("_", " ").title(),
                content=content,
                css_files=(f"{media_origin}/{css}.css",),
                js_files=(f"{media_origin}/{js}.js",),
            )

    # API dispatcher (only if apis is not None)
    if apis:
        @app.route(f"/api/{path}/<view:path>", method=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
        def api_dispatch(view):
            nonlocal home, apis
            # --- AUTH CHECK ---
            if is_enabled('web.auth') and not gw.web.auth.is_authorized(strict=auth_required):
                return gw.web.error.unauthorized("Unauthorized: API access denied.")
            # Set current endpoint in GWAY context (for helpers/build_url etc)
            gw.context['current_endpoint'] = path
            segments = [s for s in view.strip("/").split("/") if s]
            view_name = segments[0].replace("-", "_") if segments else home
            args = segments[1:] if segments else []
            kwargs = dict(request.query)
            if request.method == "POST":
                try:
                    kwargs.update(request.json or dict(request.forms))
                except Exception as e:
                    return gw.web.error.redirect("Error loading JSON payload", err=e)

            method = request.method.lower()
            specific_af = f"{apis}_{method}_{view_name}"
            generic_af = f"{apis}_{view_name}"

            api_func = getattr(source, specific_af, None)
            if not callable(api_func):
                api_func = getattr(source, generic_af, None)
            if not callable(api_func):
                return gw.web.error.redirect(f"API not found: {specific_af} or {generic_af} in {project}")

            try:
                result = api_func(*args, **kwargs)
                if isinstance(result, HTTPResponse):
                    return result
                response.content_type = "application/json"
                return gw.to_json(result)
            except HTTPResponse as res:
                return res
            except Exception as e:
                return gw.web.error.redirect("Broken API", err=e)

    @app.route("/favicon.ico")
    def favicon():
        proj_parts = project.split('.')
        candidate = gw.resource("data", "static", *proj_parts, "favicon.ico")
        if os.path.isfile(candidate):
            return static_file("favicon.ico", root=os.path.dirname(candidate))
        global_favicon = gw.resource("data", "static", "favicon.ico")
        if os.path.isfile(global_favicon):
            return static_file("favicon.ico", root=os.path.dirname(global_favicon))
        return HTTPResponse(status=404, body="favicon.ico not found")

    if gw.verbose:
        gw.debug(f"Registered homes: {_homes}")
        debug_routes(app)

    return oapp if oapp else app

Sample CLI

gway web.app static-file

Full Code

def static_file(filename, root,
                mimetype=True,
                download=False,
                charset='UTF-8',
                etag=None,
                headers=None):
    """ Open a file in a safe way and return an instance of :exc:`HTTPResponse`
        that can be sent back to the client.

        :param filename: Name or path of the file to send, relative to ``root``.
        :param root: Root path for file lookups. Should be an absolute directory
            path.
        :param mimetype: Provide the content-type header (default: guess from
            file extension)
        :param download: If True, ask the browser to open a `Save as...` dialog
            instead of opening the file with the associated program. You can
            specify a custom filename as a string. If not specified, the
            original filename is used (default: False).
        :param charset: The charset for files with a ``text/*`` mime-type.
            (default: UTF-8)
        :param etag: Provide a pre-computed ETag header. If set to ``False``,
            ETag handling is disabled. (default: auto-generate ETag header)
        :param headers: Additional headers dict to add to the response.

        While checking user input is always a good idea, this function provides
        additional protection against malicious ``filename`` parameters from
        breaking out of the ``root`` directory and leaking sensitive information
        to an attacker.

        Read-protected files or files outside of the ``root`` directory are
        answered with ``403 Access Denied``. Missing files result in a
        ``404 Not Found`` response. Conditional requests (``If-Modified-Since``,
        ``If-None-Match``) are answered with ``304 Not Modified`` whenever
        possible. ``HEAD`` and ``Range`` requests (used by download managers to
        check or continue partial downloads) are also handled automatically.
    """

    root = os.path.join(os.path.abspath(root), '')
    filename = os.path.abspath(os.path.join(root, filename.strip('/\\')))
    headers = headers.copy() if headers else {}
    getenv = request.environ.get

    if not filename.startswith(root):
        return HTTPError(403, "Access denied.")
    if not os.path.exists(filename) or not os.path.isfile(filename):
        return HTTPError(404, "File does not exist.")
    if not os.access(filename, os.R_OK):
        return HTTPError(403, "You do not have permission to access this file.")

    if mimetype is True:
        name = download if isinstance(download, str) else filename
        mimetype, encoding = mimetypes.guess_type(name)
        if encoding == 'gzip':
            mimetype = 'application/gzip'
        elif encoding: # e.g. bzip2 -> application/x-bzip2
            mimetype = 'application/x-' + encoding

    if charset and mimetype and 'charset=' not in mimetype \
        and (mimetype[:5] == 'text/' or mimetype == 'application/javascript'):
        mimetype += '; charset=%s' % charset

    if mimetype:
        headers['Content-Type'] = mimetype

    if download is True:
        download = os.path.basename(filename)

    if download:
        download = download.replace('"','')
        headers['Content-Disposition'] = 'attachment; filename="%s"' % download

    stats = os.stat(filename)
    headers['Content-Length'] = clen = stats.st_size
    headers['Last-Modified'] = email.utils.formatdate(stats.st_mtime, usegmt=True)
    headers['Date'] = email.utils.formatdate(time.time(), usegmt=True)

    if etag is None:
        etag = '%d:%d:%d:%d:%s' % (stats.st_dev, stats.st_ino, stats.st_mtime,
                                   clen, filename)
        etag = hashlib.sha1(tob(etag)).hexdigest()

    if etag:
        headers['ETag'] = etag
        check = getenv('HTTP_IF_NONE_MATCH')
        if check and check == etag:
            return HTTPResponse(status=304, **headers)

    ims = getenv('HTTP_IF_MODIFIED_SINCE')
    if ims:
        ims = parse_date(ims.split(";")[0].strip())
        if ims is not None and ims >= int(stats.st_mtime):
            return HTTPResponse(status=304, **headers)

    body = '' if request.method == 'HEAD' else open(filename, 'rb')

    headers["Accept-Ranges"] = "bytes"
    range_header = getenv('HTTP_RANGE')
    if range_header:
        ranges = list(parse_range_header(range_header, clen))
        if not ranges:
            return HTTPError(416, "Requested Range Not Satisfiable")
        offset, end = ranges[0]
        rlen = end - offset
        headers["Content-Range"] = "bytes %d-%d/%d" % (offset, end - 1, clen)
        headers["Content-Length"] = str(rlen)
        if body: body = _closeiter(_rangeiter(body, offset, rlen), body.close)
        return HTTPResponse(body, status=206, **headers)
    return HTTPResponse(body, **headers)

Sample CLI

gway web.app template

Full Code

def template(*args, **kwargs):
    """
    Get a rendered template as a string iterator.
    You can use a name, a filename or a template string as first parameter.
    Template rendering arguments can be passed as dictionaries
    or directly (as keyword arguments).
    """
    tpl = args[0] if args else None
    for dictarg in args[1:]:
        kwargs.update(dictarg)
    adapter = kwargs.pop('template_adapter', SimpleTemplate)
    lookup = kwargs.pop('template_lookup', TEMPLATE_PATH)
    tplid = (id(lookup), tpl)
    if tplid not in TEMPLATES or DEBUG:
        settings = kwargs.pop('template_settings', {})
        if isinstance(tpl, adapter):
            TEMPLATES[tplid] = tpl
            if settings: TEMPLATES[tplid].prepare(**settings)
        elif "\n" in tpl or "{" in tpl or "%" in tpl or '$' in tpl:
            TEMPLATES[tplid] = adapter(source=tpl, lookup=lookup, **settings)
        else:
            TEMPLATES[tplid] = adapter(name=tpl, lookup=lookup, **settings)
    if not TEMPLATES[tplid]:
        abort(500, 'Template (%s) not found' % tpl)
    return TEMPLATES[tplid].render(kwargs)

Sample CLI

gway web.app urlencode

Full Code

def urlencode(query, doseq=False, safe='', encoding=None, errors=None,
              quote_via=quote_plus):
    """Encode a dict or sequence of two-element tuples into a URL query string.

    If any values in the query arg are sequences and doseq is true, each
    sequence element is converted to a separate parameter.

    If the query arg is a sequence of two-element tuples, the order of the
    parameters in the output will match the order of parameters in the
    input.

    The components of a query arg may each be either a string or a bytes type.

    The safe, encoding, and errors parameters are passed down to the function
    specified by quote_via (encoding and errors only if a component is a str).
    """

    if hasattr(query, "items"):
        query = query.items()
    else:
        # It's a bother at times that strings and string-like objects are
        # sequences.
        try:
            # non-sequence items should not work with len()
            # non-empty strings will fail this
            if len(query) and not isinstance(query[0], tuple):
                raise TypeError
            # Zero-length sequences of all types will get here and succeed,
            # but that's a minor nit.  Since the original implementation
            # allowed empty dicts that type of behavior probably should be
            # preserved for consistency
        except TypeError as err:
            raise TypeError("not a valid non-string sequence "
                            "or mapping object") from err

    l = []
    if not doseq:
        for k, v in query:
            if isinstance(k, bytes):
                k = quote_via(k, safe)
            else:
                k = quote_via(str(k), safe, encoding, errors)

            if isinstance(v, bytes):
                v = quote_via(v, safe)
            else:
                v = quote_via(str(v), safe, encoding, errors)
            l.append(k + '=' + v)
    else:
        for k, v in query:
            if isinstance(k, bytes):
                k = quote_via(k, safe)
            else:
                k = quote_via(str(k), safe, encoding, errors)

            if isinstance(v, bytes):
                v = quote_via(v, safe)
                l.append(k + '=' + v)
            elif isinstance(v, str):
                v = quote_via(v, safe, encoding, errors)
                l.append(k + '=' + v)
            else:
                try:
                    # Is this a sufficient test for sequence-ness?
                    x = len(v)
                except TypeError:
                    # not a sequence
                    v = quote_via(str(v), safe, encoding, errors)
                    l.append(k + '=' + v)
                else:
                    # loop over the sequence
                    for elt in v:
                        if isinstance(elt, bytes):
                            elt = quote_via(elt, safe)
                        else:
                            elt = quote_via(str(elt), safe, encoding, errors)
                        l.append(k + '=' + elt)
    return '&'.join(l)

Sample CLI

gway web.error redirect

References

Full Code

def redirect(message="", *, err=None, default=None, view_name=None):
    """
    GWAY error/redirect handler.
    Deprecated: 'view_name'. Now uses gw.web.app.current_endpoint.
    """
    from bottle import request, response

    debug_enabled = bool(getattr(gw, "debug", False))
    visited = gw.web.cookies.get("visited", "")
    visited_items = visited.split("|") if visited else []

    # --- DEPRECATED: view_name, use gw.web.app.current_endpoint instead ---
    if view_name is not None:
        import warnings
        warnings.warn(
            "redirect(): 'view_name' is deprecated. Use gw.web.app.current_endpoint instead.",
            DeprecationWarning
        )
    curr_view = getattr(gw.web.app, "current_endpoint", None)
    view_key = curr_view() if callable(curr_view) else curr_view
    if not view_key and view_name:
        view_key = view_name

    pruned = False
    if view_key and gw.web.cookies.check_consent():
        norm_broken = (view_key or "").replace("-", " ").replace("_", " ").title().lower()
        new_items = []
        for v in visited_items:
            title = v.split("=", 1)[0].strip().lower()
            if title == norm_broken:
                pruned = True
                continue
            new_items.append(v)
        if pruned:
            gw.web.cookies.set("visited", "|".join(new_items))
            visited_items = new_items

    if debug_enabled:
        return view_debug_error(
            title="GWAY Debug Error",
            message=message,
            err=err,
            status=500,
            default=default
        )

    response.status = 302
    response.set_header("Location", default or gw.web.app.default_home())
    return ""

Sample CLI

gway web.error view-debug-error

References

Full Code

def view_debug_error(
    *,
    title="GWAY Debug Error",
    message="An error occurred.",
    err=None,
    status=500,
    default=None
):
    """
    Render a debug error view with detailed traceback and request info.
    """
    from bottle import request, response
    import traceback
    import html

    tb_str = ""
    if err:
        tb_str = "".join(traceback.format_exception(type(err), err, getattr(err, "__traceback__", None)))

    debug_content = f"""
    <html>
    <head>
        <title>{html.escape(title)}</title>
        <style>
            body {{ font-family: monospace, sans-serif; background: #23272e; color: #e6e6e6; }}
            .traceback {{ background: #16181c; color: #ff8888; padding: 1em; border-radius: 5px; margin: 1em 0; white-space: pre; }}
            .kv {{ color: #6ee7b7; }}
            .section {{ margin-bottom: 2em; }}
            h1 {{ color: #ffa14a; }}
            a {{ color: #69f; }}
            .copy-btn {{ margin: 1em 0; background:#333;color:#fff;padding:0.4em 0.8em;border-radius:4px;cursor:pointer;border:1px solid #aaa; }}
        </style>
    </head>
    <body>
        <h1>{html.escape(title)}</h1>
        <div id="debug-content">
            <div class="section"><b>Message:</b> {html.escape(str(message) or "")}</div>
            <div class="section"><b>Error:</b> {html.escape(str(err) or "")}</div>
            <div class="section"><b>Path:</b> {html.escape(request.path or "")}<br>
                                 <b>Method:</b> {html.escape(request.method or "")}<br>
                                 <b>Full URL:</b> {html.escape(request.url or "")}</div>
            <div class="section"><b>Query:</b> {html.escape(str(dict(request.query)) or "")}</div>
            <div class="section"><b>Form:</b> {html.escape(str(getattr(request, "forms", "")) or "")}</div>
            <div class="section"><b>Headers:</b> {html.escape(str(dict(request.headers)) or "")}</div>
            <div class="section"><b>Cookies:</b> {html.escape(str(dict(request.cookies)) or "")}</div>
            <div class="section"><b>Traceback:</b>
                <div class="traceback">{html.escape(tb_str or '(no traceback)')}</div>
            </div>
        </div>
        <div><a href="{html.escape(default or gw.web.app.default_home())}">&#8592; Back to home</a></div>
    </body>
    </html>
    """
    response.status = status
    response.content_type = "text/html"
    return debug_content

Sample CLI

gway web.nav get-style

References

Full Code

def get_style():
    """
    Returns the current user's preferred style path (to .css file), checking:
    - URL ?css= param (for preview/testing)
    - 'css' cookie
    - First available style, or '/static/styles/base.css' if none found
    This should be called by render_template for every page load.
    """
    styles = list_styles()
    style_cookie = gw.web.cookies.get("css") if gw.web.app.is_enabled('web.cookies') else None
    style_query = request.query.get("css")
    style_path = None

    # Prefer query param (if exists and valid)
    if style_query:
        for src, fname in styles:
            if fname == style_query:
                style_path = f"/static/styles/{fname}" if src == "global" else f"/static/{src}/styles/{fname}"
                break
    # Otherwise, prefer cookie
    if not style_path and style_cookie:
        for src, fname in styles:
            if fname == style_cookie:
                style_path = f"/static/styles/{fname}" if src == "global" else f"/static/{src}/styles/{fname}"
                break
    # Otherwise, first available style
    if not style_path and styles:
        src, fname = styles[0]
        style_path = f"/static/styles/{fname}" if src == "global" else f"/static/{src}/styles/{fname}"
    # Fallback to base
    return style_path or "/static/styles/base.css"

Sample CLI

gway web.nav render

References

Full Code

def render(*, homes=None):
    """
    Renders the sidebar navigation including search, home links, visited links, and a QR compass.
    """
    cookies_ok = gw.web.app.is_enabled('web.cookies') and gw.web.cookies.check_consent()
    gw.debug(f"Render nav with {homes=} {cookies_ok=}")

    visited = []
    if cookies_ok:
        visited_cookie = gw.web.cookies.get("visited", "")
        if visited_cookie:
            visited = visited_cookie.split("|")

    current_route = request.fullpath.strip("/")
    current_title = (current_route.split("/")[-1] or "readme").replace('-', ' ').replace('_', ' ').title()

    visited_set = set()
    entries = []
    for entry in visited:
        if "=" not in entry:
            continue
        title, route = entry.split("=", 1)
        canon_route = route.strip("/")
        if canon_route not in visited_set:
            entries.append((title, canon_route))
            visited_set.add(canon_route)

    home_routes = set()
    if homes:
        for home_title, home_route in homes:
            home_routes.add(home_route.strip("/"))
    if cookies_ok and current_route not in home_routes and current_route not in visited_set:
        entries.append((current_title, current_route))
        visited_set.add(current_route)

    # --- Build HTML links ---
    links = ""
    if homes:
        for home_title, home_route in homes:
            route = home_route.strip("/")
            is_current = ' class="current"' if route == current_route else ""
            links += f'<li><a href="/{home_route}"{is_current}>{home_title.upper()}</a></li>'
    if cookies_ok and entries:
        visited_rendered = set()
        for title, route in reversed(entries):
            if route in home_routes or route in visited_rendered:
                continue
            visited_rendered.add(route)
            is_current = ' class="current"' if route == current_route else ""
            links += f'<li><a href="/{route}"{is_current}>{title}</a></li>'
    elif not homes:
        links += f'<li class="current">{current_title.upper()}</li>'

    # --- Search box ---
    search_box = '''
        <form action="/site/help" method="get" class="nav">
            <textarea name="topic" id="help-search"
                placeholder="Search this GWAY"
                class="help" rows="1"
                autocomplete="off"
                spellcheck="false"
                style="overflow:hidden; resize:none; min-height:2.4em; max-height:10em;"
                oninput="autoExpand(this)"
            >{}</textarea>
        </form>
    '''.format(request.query.get("topic", ""))

    # --- QR code for this page ---
    compass = ""
    try:
        url = get_current_url()
        qr_url = gw.qr.generate_url(url)
        compass = f'''
            <div class="compass">
                <img src="{qr_url}" alt="QR Code" class="compass" />
            </div>
        '''
    except Exception as e:
        gw.debug(f"Could not generate QR compass: {e}")

    gw.debug(f"Visited cookie raw: {gw.web.cookies.get('visited')}")
    return f"<aside>{search_box}<ul>{links}</ul><br>{compass}</aside>"

Sample CLI

gway web.nav view-style-switcher

References

Full Code

def view_style_switcher(*, css=None, project=None):
    """
    Shows available styles (global + project), lets user choose, preview, and see raw CSS.
    If cookies are accepted, sets the style via cookie when changed in dropdown.
    If cookies are not accepted, only uses the css param for preview.
    """
    import os
    from bottle import request, response

    # Determine the project from context or fallback if not provided
    if not project:
        path = request.fullpath.strip("/").split("/")
        if path and path[0] in ("conway", "awg", "site", "etron"):
            project = path[0]
        else:
            project = "site"

    def list_styles_local(project):
        seen = set()
        styles = []
        # Global styles
        global_dir = gw.resource("data", "static", "styles")
        if os.path.isdir(global_dir):
            for f in sorted(os.listdir(global_dir)):
                if f.endswith(".css") and os.path.isfile(os.path.join(global_dir, f)):
                    if f not in seen:
                        styles.append(("global", f))
                        seen.add(f)
        if project:
            proj_dir = gw.resource("data", "static", project, "styles")
            if os.path.isdir(proj_dir):
                for f in sorted(os.listdir(proj_dir)):
                    if f.endswith(".css") and os.path.isfile(os.path.join(proj_dir, f)):
                        if f not in seen:
                            styles.append((project, f))
                            seen.add(f)
        return styles

    styles = list_styles_local(project)
    all_styles = [fname for _, fname in styles]
    style_sources = {fname: src for src, fname in styles}

    cookies_enabled = gw.web.app.is_enabled('web.cookies')
    cookies_accepted = gw.web.cookies.check_consent() if cookies_enabled else False
    css_cookie = gw.web.cookies.get("css")

    # Handle POST
    if request.method == "POST":
        selected_style = request.forms.get("css")
        if cookies_enabled and cookies_accepted and selected_style and selected_style in all_styles:
            gw.web.cookies.set("css", selected_style)
            response.status = 303
            response.set_header("Location", request.fullpath)
            return ""

    # --- THIS IS THE MAIN LOGIC: ---
    # Priority: query param > explicit function arg > cookie > default
    style_query = request.query.get("css")
    selected_style = (
        style_query if style_query in all_styles else
        (css if css in all_styles else
         (css_cookie if css_cookie in all_styles else
          (all_styles[0] if all_styles else "base.css")))
    )
    # If still not valid, fallback to default
    if selected_style not in all_styles:
        selected_style = all_styles[0] if all_styles else "base.css"

    # Determine preview link and path for raw CSS
    if style_sources.get(selected_style) == "global":
        preview_href = f"/static/styles/{selected_style}"
        css_path = gw.resource("data", "static", "styles", selected_style)
        css_link = f'<link rel="stylesheet" href="/static/styles/{selected_style}">'
    else:
        preview_href = f"/static/{project}/styles/{selected_style}"
        css_path = gw.resource("data", "static", project, "styles", selected_style)
        css_link = f'<link rel="stylesheet" href="/static/{project}/styles/{selected_style}">'

    preview_html = f"""
        {css_link}
        <div class="style-preview">
            <h2>Theme Preview: {selected_style[:-4].replace('_', ' ').title()}</h2>
            <p>This is a preview of the <b>{selected_style}</b> theme.</p>
            <button>Sample button</button>
            <pre>code block</pre>
        </div>
    """
    css_code = ""
    try:
        with open(css_path, encoding="utf-8") as f:
            css_code = f.read()
    except Exception:
        css_code = "Could not load CSS file."

    selector = style_selector_form(
        all_styles=styles,
        selected_style=selected_style,
        cookies_enabled=cookies_enabled,
        cookies_accepted=cookies_accepted,
        project=project
    )

    return f"""
        <h1>Select a Site Theme</h1>
        {selector}
        {preview_html}
        <h3>CSS Source: {selected_style}</h3>
        <pre style="max-height:400px;overflow:auto;">{html_escape(css_code)}</pre>
    """

Sample CLI

gway web.server start-app

References

Full Code

def start_app(*,
    host="[WEBSITE_HOST|127.0.0.1]",
    port="[WEBSITE_PORT|8888]",
    ws_port="[WEBSOCKET_PORT|9000]",         
    debug=False,
    proxy=None,
    app=None,
    daemon=True,
    threaded=True,
    is_worker=False,
    workers=None,
):
    """
    Start an HTTP (WSGI) or ASGI server to host the given application.

    - If `app` is a FastAPI instance, runs with Uvicorn (optionally on ws_port if set).
    - If `app` is a WSGI app, uses Paste+ws4py or Bottle.
    - If `app` is a zero-arg factory, it will be invoked (supporting sync or async factories).
    - If `app` is a list of apps, each will be run in its own thread (each on an incremented port; FastAPI uses ws_port if set).
    """
    import inspect
    import asyncio

    host = gw.resolve(host) if isinstance(host, str) else host
    port = gw.resolve(port) if isinstance(port, str) else port
    ws_port = gw.resolve(ws_port) if isinstance(ws_port, str) else ws_port

    def run_server():
        nonlocal app
        all_apps = app if iterable(app) else (app, )

        # ---- Multi-app mode ----
        if not is_worker and len(all_apps) > 1:
            from threading import Thread
            from collections import Counter
            threads = []
            app_types = []
            gw.info(f"Starting {len(all_apps)} apps in parallel threads.")

            fastapi_count = 0
            for i, sub_app in enumerate(all_apps):
                try:
                    from fastapi import FastAPI
                    is_fastapi = isinstance(sub_app, FastAPI)
                    app_type = "FastAPI" if is_fastapi else type(sub_app).__name__
                except ImportError:
                    is_fastapi = False
                    app_type = type(sub_app).__name__

                # ---- Use ws_port for the first FastAPI app if provided, else increment port as before ----
                if is_fastapi and ws_port and fastapi_count == 0:
                    port_i = int(ws_port)
                    fastapi_count += 1
                else:
                    # Use base port + i, skipping ws_port if it's in the range
                    port_i = int(port) + i
                    # Prevent port collision if ws_port == port_i (rare but possible)
                    if ws_port and port_i == int(ws_port):
                        port_i += 1

                gw.info(f"  App {i+1}: type={app_type}, port={port_i}")
                app_types.append(app_type)

                t = Thread(
                    target=gw.web.server.start_app,
                    kwargs=dict(
                        host=host,
                        port=port_i,
                        ws_port=None,  # Only outer thread assigns ws_port!
                        debug=debug,
                        proxy=proxy,
                        app=sub_app,
                        daemon=daemon,
                        threaded=threaded,
                        is_worker=True,
                    ),
                    daemon=daemon,
                )
                t.start()
                threads.append(t)

            type_summary = Counter(app_types)
            summary_str = ", ".join(f"{count}×{t}" for t, count in type_summary.items())
            gw.info(f"All {len(all_apps)} apps started. Types: {summary_str}")

            if not daemon:
                for t in threads:
                    t.join()
            return

        # ---- Single-app mode ----
        global _default_app_build_count
        if not all_apps or all_apps == (None,):
            _default_app_build_count += 1
            if _default_app_build_count > 1:
                gw.warning(
                    f"Default app is being built {_default_app_build_count} times! "
                    "This may indicate a misconfiguration or repeated server setup. "
                    "Check your recipe/config. Run with --app default to silence."
                )
            app = gw.web.app.setup(app=None)
        else:
            app = all_apps[0]

        # Proxy setup (unchanged)
        if proxy:
            from .proxy import setup_app as setup_proxy
            app = setup_proxy(endpoint=proxy, app=app)

        # Factory support (unchanged)
        if callable(app):
            sig = inspect.signature(app)
            if len(sig.parameters) == 0:
                gw.info(f"Calling app factory: {app}")
                maybe_app = app()
                if inspect.isawaitable(maybe_app):
                    maybe_app = asyncio.get_event_loop().run_until_complete(maybe_app)
                app = maybe_app
            else:
                gw.info(f"Detected callable WSGI/ASGI app: {app}")

        # ---- Detect ASGI/FastAPI ----
        try:
            from fastapi import FastAPI
            is_asgi = isinstance(app, FastAPI)
        except ImportError:
            is_asgi = False

        if is_asgi:
            # Use ws_port if provided, else use regular port
            port_to_use = int(ws_port) if ws_port else int(port)
            ws_url = f"ws://{host}:{port_to_use}"
            gw.info(f"WebSocket support active @ {ws_url}/<path>?token=...")
            try:
                import uvicorn
            except ImportError:
                raise RuntimeError("uvicorn is required to serve ASGI apps. Please install uvicorn.")

            uvicorn.run(
                app,
                host=host,
                port=port_to_use,
                log_level="debug" if debug else "info",
                workers=workers or 1,
                reload=debug,
            )
            return

        # ---- WSGI fallback (unchanged) ----
        from bottle import run as bottle_run, Bottle
        try:
            from paste import httpserver
        except ImportError:
            httpserver = None

        try:
            from ws4py.server.wsgiutils import WebSocketWSGIApplication
            ws4py_available = True
        except ImportError:
            ws4py_available = False

        if httpserver:
            httpserver.serve(
                app, host=host, port=int(port), 
                threadpool_workers=(workers or 5), 
            )
        elif isinstance(app, Bottle):
            bottle_run(
                app,
                host=host,
                port=int(port),
                debug=debug,
                threaded=threaded,
            )
        else:
            raise TypeError(f"Unsupported WSGI app type: {type(app)}")

    if daemon:
        return asyncio.to_thread(run_server)
    else:
        run_server()

Sample CLI

gway web.site view-qr-code

References

Full Code

def view_qr_code(*args, value=None, **kwargs):
    """Generate a QR code for a given value and serve it from cache if available."""
    if not value:
        return '''
            <h1>QR Code Generator</h1>
            <form method="post">
                <input type="text" name="value" placeholder="Enter text or URL" required class="main" />
                <button type="submit" class="submit">Generate QR</button>
            </form>
        '''
    qr_url = gw.qr.generate_url(value)
    back_link = gw.web.app_url("qr-code")
    return f"""
        <h1>QR Code for:</h1>
        <h2><code>{value}</code></h2>
        <img src="{qr_url}" alt="QR Code" class="qr" />
        <p><a href="{back_link}">Generate another</a></p>
    """

Sample CLI

gway web.static collect

References

Full Code

def collect(*, css="global", js="global", root="data/static", target="work/shared"):
    enabled = getattr(gw.web.app, "enabled_projects", lambda: set())()
    static_root = gw.resource(root)

    def find_files(kind, proj):
        found = []
        seen = set()
        parts = proj.split('.')
        # Recursively walk project path
        if parts:
            proj_path = os.path.join(static_root, *parts)
            for rootdir, dirs, files in os.walk(proj_path):
                rel_root = os.path.relpath(rootdir, static_root)
                for fname in files:
                    if kind == "css" and fname.endswith(".css"):
                        rel = os.path.join(rel_root, fname)
                    elif kind == "js" and fname.endswith(".js"):
                        rel = os.path.join(rel_root, fname)
                    else:
                        continue
                    if rel not in seen:
                        seen.add(rel)
                        found.append((proj, rel, os.path.join(rootdir, fname)))
        # Ancestors, only direct files
        for i in range(len(parts)-1, -1, -1):
            ancestor_path = os.path.join(static_root, *parts[:i])
            if not os.path.isdir(ancestor_path):
                continue
            rel_ancestor = os.path.relpath(ancestor_path, static_root)
            for fname in os.listdir(ancestor_path):
                fpath = os.path.join(ancestor_path, fname)
                if not os.path.isfile(fpath):
                    continue
                if kind == "css" and fname.endswith(".css"):
                    rel = os.path.join(rel_ancestor, fname) if rel_ancestor != "." else fname
                elif kind == "js" and fname.endswith(".js"):
                    rel = os.path.join(rel_ancestor, fname) if rel_ancestor != "." else fname
                else:
                    continue
                if rel not in seen:
                    seen.add(rel)
                    found.append((proj, rel, fpath))
        return found

    report = {"css": [], "js": []}
    # --- Collect CSS ---
    if css:
        all_css = []
        for proj in enabled:
            all_css.extend(find_files("css", proj))
        seen_css = set()
        for entry in all_css:
            if entry[1] not in seen_css:
                seen_css.add(entry[1])
                report["css"].append(entry)
        if isinstance(css, str):
            bundle_path = gw.resource(target, f"{css}.css")
            with open(bundle_path, "w", encoding="utf-8") as out:
                for proj, rel, full in report["css"]:
                    with open(full, "r", encoding="utf-8") as f:
                        out.write(f"/* --- {proj}:{rel} --- */\n")
                        out.write(f.read() + "\n\n")
            report["css_bundle"] = bundle_path

    # --- Collect JS ---
    if js:
        all_js = []
        for proj in enabled:
            all_js.extend(find_files("js", proj))
        seen_js = set()
        for entry in all_js:
            if entry[1] not in seen_js:
                seen_js.add(entry[1])
                report["js"].append(entry)
        if isinstance(js, str):
            bundle_path = gw.resource(target, f"{js}.js")
            with open(bundle_path, "w", encoding="utf-8") as out:
                for proj, rel, full in report["js"]:
                    with open(full, "r", encoding="utf-8") as f:
                        out.write(f"// --- {proj}:{rel} ---\n")
                        out.write(f.read() + "\n\n")
            report["js_bundle"] = bundle_path

    return report

Sample CLI

gway build-url

References

Full Code

def build_url(*args, **kwargs):
    """Build a fully-qualified context-aware URL given a path sequence and query params."""
    from gway import gw
    try:
        return base_url() + gw.web.app.build_url(*args, **kwargs)
    except AttributeError:
        return base_url() + '/'.join(args) 

Sample CLI

gway build-ws-url

References

Full Code

def build_ws_url(*args, **kwargs):
    """Build a fully-qualified context-aware URL given a path sequence and query params."""
    from gway import gw
    try:
        return base_ws_url() + gw.web.app.build_url(*args, **kwargs)
    except AttributeError:
        return base_ws_url() + '/'.join(args)