def render_error(title: str, message: str, *, back_link: bool = True, target: str="uploads") -> str:
"""Helper for error display with optional link back to upload main page."""
html = f"<h1>{title}</h1><p>{message}</p>"
if back_link:
url = gw.web.app.build_url(target)
html += f'<p class="error"><a href="{url}?">Back to {target} page</a></p>'
return html
def view_uploads(*, vbid: str = None, timeout: int = 60, files: int = 4, email: str = None, **kwargs):
"""
GET: Display upload interface or create a new upload box.
POST: Handle uploaded files to a specific vbid.
"""
global _gc_thread_on
if not _gc_thread_on:
threading.Thread(target=periodic_purge, daemon=True).start()
_gc_thread_on = True
admin_email = os.environ.get("ADMIN_EMAIL")
gw.info(f"Entry: vbid={vbid!r}, timeout={timeout}, files={files}, email={email!r}, method={request.method}")
# Handle file upload (POST) with a vbid (the classic file upload case)
if request.method == 'POST' and vbid:
gw.info(f"POST file upload for vbid={vbid}")
with _gc_lock:
expire = _open_boxes.get(vbid)
if not expire or expire < time.time():
gw.warning(f"vbox expired for vbid={vbid}")
return render_error("Upload Box Expired", "Please regenerate a new vbid.")
try:
short, _ = vbid.split(".", 1)
except ValueError:
gw.error(f"Invalid vbid format: {vbid}")
return render_error("Invalid vbid format", "Expected form: <code>short.long</code>.")
upload_dir = gw.resource(*VBOX_PATH, short)
os.makedirs(upload_dir, exist_ok=True)
uploaded_files = request.files.getlist("file")
results = []
for f in uploaded_files:
save_path = os.path.join(upload_dir, f.filename)
try:
f.save(save_path)
results.append(f"Uploaded {f.filename}")
gw.info(f"Uploaded {f.filename} to {short}")
except Exception as e:
results.append(f"Error uploading {f.filename}: {e}")
gw.error(f"Issue uploading {f.filename} to {short}")
gw.exception(e)
download_short_url = gw.web.app.build_url("download", vbid=short)
download_long_url = gw.web.app.build_url("download", vbid=vbid)
gw.info(f"Returning upload result UI for vbid={vbid}")
return (
"<pre>" + "\n".join(results) + "</pre>" +
f"<p><a href='?vbid={vbid}'>UPLOAD MORE files to this box</a></p>" +
f"<p><a href='{download_short_url}'>Go to PUBLIC READ-ONLY download page for this box</a></p>" +
f"<p><a href='{download_long_url}'>Go to HIDDEN WRITE download page for this box</a></p>"
)
if not vbid:
gw.info(f"No vbid present, always creating/checking box.")
remote_addr = request.remote_addr or ''
user_agent = request.headers.get('User-Agent') or ''
identity = remote_addr + user_agent
hash_digest = hashlib.sha256(identity.encode()).hexdigest()
short = hash_digest[:12]
full_id = f"{short}.{hash_digest[:40]}"
with _gc_lock:
now = time.time()
expires = _open_boxes.get(full_id)
if not expires or expires < now:
_open_boxes[full_id] = now + timeout * 60
os.makedirs(gw.resource(*VBOX_PATH, short), exist_ok=True)
url = gw.build_url("uploads", vbid=full_id)
message = f"[UPLOAD] Upload box created (expires in {timeout} min): {url}"
print(("-" * 70) + '\n' + message + '\n' + ("-" * 70))
gw.warning(message)
gw.info(f"Created new box: {full_id}")
else:
url = gw.build_url("upload", vbid=full_id)
gw.info(f"Existing box reused: {full_id}")
admin_notif = ""
sent_copy_msg = "<p>A copy of the access URL was sent to the admin.</p>"
if email:
if admin_email and email.lower() == admin_email.strip().lower():
subject = "Upload Box Link"
body = (
f"A new upload box was created.\n\n"
f"Access URL: {url}\n\n"
f"This box will expire in {timeout} minutes."
)
try:
gw.mail.send(subject, body=body, to=admin_email)
gw.info(f"Sent upload URL email to admin.")
except Exception as e:
gw.error(f"Error sending VBOX notification email: {e}")
admin_notif = sent_copy_msg
else:
admin_notif = sent_copy_msg
gw.info(f"Pretend email sent: {email!r} != {admin_email!r}")
# Show the ready box UI + the optional email form
email_form_html = (
"<form method='POST'>"
"<input type='email' name='email' required placeholder='Your email address'>"
"<button type='submit'>Request Link</button>"
"</form>"
)
form_message = (
"<p>If you are a site member, you may request a URL to be sent to your email by entering it here.</p>"
)
local_console_info = ""
if gw.web.server.is_local():
local_console_info = (
"<p>We've prepared an upload box for you. Check the console for the access URL.</p>"
"<p>To use it, go to <code>?vbid=…</code> and upload your files there.</p>"
)
return (
"<h1>Upload to Virtual Box</h1>"
f"{local_console_info}"
f"{admin_notif}"
f"{form_message if not email else ''}{email_form_html if not email else ''}"
)
# Validate and show upload UI for an existing vbid
gw.info(f"Render upload UI for vbid={vbid!r}")
with _gc_lock:
expire = _open_boxes.get(vbid)
if not expire or expire < time.time():
gw.warning(f"vbox expired for vbid={vbid}")
return render_error("Upload Box Expired or Not Found", "Please regenerate a new vbid.")
try:
short, _ = vbid.split(".", 1)
except ValueError:
gw.error(f"Invalid vbid format: {vbid}")
return render_error("Invalid vbid format", "Expected form: <code>short.long</code>.")
# Generate N file input fields
file_inputs = "\n".join(
f'<input type="file" name="file">' for _ in range(max(1, files))
)
download_url = gw.build_url("download", vbid=vbid)
gw.info(f"Displaying upload form for {short}")
return f"<h1>Upload to Box: {short}</h1>" + f"""
<form method="POST" enctype="multipart/form-data">
{file_inputs}
<br><p><button type="submit">Upload</button><p/>
</form>
<p>Files will be stored in <code>{'/'.join(VBOX_PATH)}/{short}/</code></p>
<p><a href="{download_url}">Go to download page for this box</a></p>
"""
def build_url(*args, **kwargs):
path = "/".join(str(a).strip("/") for a in args if a)
endpoint = current_endpoint()
if endpoint:
url = f"/{endpoint}/{path}" if path else f"/{endpoint}"
else:
url = f"/{path}"
if kwargs:
url += "?" + urlencode(kwargs)
return url
def current_endpoint():
"""
Return the canonical endpoint path for the current request (the project route prefix).
Falls back to gw.context['current_endpoint'], or None.
"""
return gw.context.get('current_endpoint')
def setup(*,
app=None,
project="web.site",
path=None,
home: str = None,
views: str = "view",
apis: str = "api",
static="static",
shared="shared",
css="global", # Default CSS (without .css extension)
js="global", # Default JS (without .js extension)
auth_required=False, # Default: Don't enforce --optional security
engine="bottle",
):
"""
Setup Bottle web application with symmetrical static/shared public folders.
Only one project per app. CSS/JS params are used as the only static includes.
"""
global _ver, _homes, _enabled
if engine != "bottle":
raise NotImplementedError("Only Bottle is supported at the moment.")
_ver = _ver or gw.version()
bottle.BaseRequest.MEMFILE_MAX = UPLOAD_MB * 1024 * 1024
if not isinstance(project, str) or not project:
gw.abort("Project must be a non-empty string.")
# Track project for later global static collection
_enabled.add(project)
# Always use the given project, never a list
try:
source = gw[project]
except Exception:
gw.abort(f"Project {project} not found in Gateway during app setup.")
# Default path is the dotted project name, minus any leading web/
if path is None:
path = project.replace('.', '/')
if path.startswith('web/'):
path = path.removeprefix('web/')
is_new_app = not (app := gw.unwrap_one(app, Bottle) if (oapp := app) else None)
if is_new_app:
gw.info("No Bottle app found; creating a new Bottle app.")
app = Bottle()
_homes.clear()
add_home(home, path)
@app.route("/", method=["GET", "POST"])
def index():
response.status = 302
response.set_header("Location", default_home())
return ""
@app.error(404)
def handle_404(error):
return gw.web.error.redirect(f"404 Not Found: {request.url}", err=error)
elif home:
add_home(home, path)
# Serve shared files (flat mount)
if shared:
@app.route(f"/{path}/{shared}/<filepath:path>")
@app.route(f"/{shared}/<filepath:path>")
def send_shared(filepath):
file_path = gw.resource("work", "shared", filepath)
if os.path.isfile(file_path):
return static_file(os.path.basename(file_path), root=os.path.dirname(file_path))
return HTTPResponse(status=404, body="shared file not found")
# Serve static files (flat mount)
if static:
@app.route(f"/{path}/{static}/<filepath:path>")
@app.route(f"/{static}/<filepath:path>")
def send_static(filepath):
file_path = gw.resource("data", "static", filepath)
if os.path.isfile(file_path):
return static_file(os.path.basename(file_path), root=os.path.dirname(file_path))
return HTTPResponse(status=404, body="static file not found")
# Main view dispatcher (only if views is not None)
if views:
@app.route(f"/{path}/<view:path>", method=["GET", "POST"])
def view_dispatch(view):
nonlocal home, views
# --- AUTH CHECK ---
if is_enabled('web.auth') and not gw.web.auth.is_authorized(strict=auth_required):
return gw.web.error.unauthorized("Unauthorized: You are not permitted to view this page.")
# Set current endpoint in GWAY context (for helpers/build_url etc)
gw.context['current_endpoint'] = path
segments = [s for s in view.strip("/").split("/") if s]
view_name = segments[0].replace("-", "_") if segments else home
args = segments[1:] if segments else []
kwargs = dict(request.query)
if request.method == "POST":
try:
kwargs.update(request.json or dict(request.forms))
except Exception as e:
return gw.web.error.redirect("Error loading JSON payload", error=e)
target_func_name = f"{views}_{view_name}" if views else view_name
view_func = getattr(source, target_func_name, None)
if not callable(view_func):
return gw.web.error.redirect(f"View not found: {target_func_name} in {project}")
try:
content = view_func(*args, **kwargs)
if isinstance(content, HTTPResponse):
return content
elif isinstance(content, bytes):
response.content_type = "application/octet-stream"
response.body = content
return response
elif content is None:
return ""
elif not isinstance(content, str):
content = gw.to_html(content)
except HTTPResponse as res:
return res
except Exception as e:
return gw.web.error.redirect("Broken view", err=e)
media_origin = "/shared" if shared else ("static" if static else "")
return render_template(
title="GWAY - " + view_func.__name__.replace("_", " ").title(),
content=content,
css_files=(f"{media_origin}/{css}.css",),
js_files=(f"{media_origin}/{js}.js",),
)
# API dispatcher (only if apis is not None)
if apis:
@app.route(f"/api/{path}/<view:path>", method=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
def api_dispatch(view):
nonlocal home, apis
# --- AUTH CHECK ---
if is_enabled('web.auth') and not gw.web.auth.is_authorized(strict=auth_required):
return gw.web.error.unauthorized("Unauthorized: API access denied.")
# Set current endpoint in GWAY context (for helpers/build_url etc)
gw.context['current_endpoint'] = path
segments = [s for s in view.strip("/").split("/") if s]
view_name = segments[0].replace("-", "_") if segments else home
args = segments[1:] if segments else []
kwargs = dict(request.query)
if request.method == "POST":
try:
kwargs.update(request.json or dict(request.forms))
except Exception as e:
return gw.web.error.redirect("Error loading JSON payload", err=e)
method = request.method.lower()
specific_af = f"{apis}_{method}_{view_name}"
generic_af = f"{apis}_{view_name}"
api_func = getattr(source, specific_af, None)
if not callable(api_func):
api_func = getattr(source, generic_af, None)
if not callable(api_func):
return gw.web.error.redirect(f"API not found: {specific_af} or {generic_af} in {project}")
try:
result = api_func(*args, **kwargs)
if isinstance(result, HTTPResponse):
return result
response.content_type = "application/json"
return gw.to_json(result)
except HTTPResponse as res:
return res
except Exception as e:
return gw.web.error.redirect("Broken API", err=e)
@app.route("/favicon.ico")
def favicon():
proj_parts = project.split('.')
candidate = gw.resource("data", "static", *proj_parts, "favicon.ico")
if os.path.isfile(candidate):
return static_file("favicon.ico", root=os.path.dirname(candidate))
global_favicon = gw.resource("data", "static", "favicon.ico")
if os.path.isfile(global_favicon):
return static_file("favicon.ico", root=os.path.dirname(global_favicon))
return HTTPResponse(status=404, body="favicon.ico not found")
if gw.verbose:
gw.debug(f"Registered homes: {_homes}")
debug_routes(app)
return oapp if oapp else app
def static_file(filename, root,
mimetype=True,
download=False,
charset='UTF-8',
etag=None,
headers=None):
""" Open a file in a safe way and return an instance of :exc:`HTTPResponse`
that can be sent back to the client.
:param filename: Name or path of the file to send, relative to ``root``.
:param root: Root path for file lookups. Should be an absolute directory
path.
:param mimetype: Provide the content-type header (default: guess from
file extension)
:param download: If True, ask the browser to open a `Save as...` dialog
instead of opening the file with the associated program. You can
specify a custom filename as a string. If not specified, the
original filename is used (default: False).
:param charset: The charset for files with a ``text/*`` mime-type.
(default: UTF-8)
:param etag: Provide a pre-computed ETag header. If set to ``False``,
ETag handling is disabled. (default: auto-generate ETag header)
:param headers: Additional headers dict to add to the response.
While checking user input is always a good idea, this function provides
additional protection against malicious ``filename`` parameters from
breaking out of the ``root`` directory and leaking sensitive information
to an attacker.
Read-protected files or files outside of the ``root`` directory are
answered with ``403 Access Denied``. Missing files result in a
``404 Not Found`` response. Conditional requests (``If-Modified-Since``,
``If-None-Match``) are answered with ``304 Not Modified`` whenever
possible. ``HEAD`` and ``Range`` requests (used by download managers to
check or continue partial downloads) are also handled automatically.
"""
root = os.path.join(os.path.abspath(root), '')
filename = os.path.abspath(os.path.join(root, filename.strip('/\\')))
headers = headers.copy() if headers else {}
getenv = request.environ.get
if not filename.startswith(root):
return HTTPError(403, "Access denied.")
if not os.path.exists(filename) or not os.path.isfile(filename):
return HTTPError(404, "File does not exist.")
if not os.access(filename, os.R_OK):
return HTTPError(403, "You do not have permission to access this file.")
if mimetype is True:
name = download if isinstance(download, str) else filename
mimetype, encoding = mimetypes.guess_type(name)
if encoding == 'gzip':
mimetype = 'application/gzip'
elif encoding: # e.g. bzip2 -> application/x-bzip2
mimetype = 'application/x-' + encoding
if charset and mimetype and 'charset=' not in mimetype \
and (mimetype[:5] == 'text/' or mimetype == 'application/javascript'):
mimetype += '; charset=%s' % charset
if mimetype:
headers['Content-Type'] = mimetype
if download is True:
download = os.path.basename(filename)
if download:
download = download.replace('"','')
headers['Content-Disposition'] = 'attachment; filename="%s"' % download
stats = os.stat(filename)
headers['Content-Length'] = clen = stats.st_size
headers['Last-Modified'] = email.utils.formatdate(stats.st_mtime, usegmt=True)
headers['Date'] = email.utils.formatdate(time.time(), usegmt=True)
if etag is None:
etag = '%d:%d:%d:%d:%s' % (stats.st_dev, stats.st_ino, stats.st_mtime,
clen, filename)
etag = hashlib.sha1(tob(etag)).hexdigest()
if etag:
headers['ETag'] = etag
check = getenv('HTTP_IF_NONE_MATCH')
if check and check == etag:
return HTTPResponse(status=304, **headers)
ims = getenv('HTTP_IF_MODIFIED_SINCE')
if ims:
ims = parse_date(ims.split(";")[0].strip())
if ims is not None and ims >= int(stats.st_mtime):
return HTTPResponse(status=304, **headers)
body = '' if request.method == 'HEAD' else open(filename, 'rb')
headers["Accept-Ranges"] = "bytes"
range_header = getenv('HTTP_RANGE')
if range_header:
ranges = list(parse_range_header(range_header, clen))
if not ranges:
return HTTPError(416, "Requested Range Not Satisfiable")
offset, end = ranges[0]
rlen = end - offset
headers["Content-Range"] = "bytes %d-%d/%d" % (offset, end - 1, clen)
headers["Content-Length"] = str(rlen)
if body: body = _closeiter(_rangeiter(body, offset, rlen), body.close)
return HTTPResponse(body, status=206, **headers)
return HTTPResponse(body, **headers)
def template(*args, **kwargs):
"""
Get a rendered template as a string iterator.
You can use a name, a filename or a template string as first parameter.
Template rendering arguments can be passed as dictionaries
or directly (as keyword arguments).
"""
tpl = args[0] if args else None
for dictarg in args[1:]:
kwargs.update(dictarg)
adapter = kwargs.pop('template_adapter', SimpleTemplate)
lookup = kwargs.pop('template_lookup', TEMPLATE_PATH)
tplid = (id(lookup), tpl)
if tplid not in TEMPLATES or DEBUG:
settings = kwargs.pop('template_settings', {})
if isinstance(tpl, adapter):
TEMPLATES[tplid] = tpl
if settings: TEMPLATES[tplid].prepare(**settings)
elif "\n" in tpl or "{" in tpl or "%" in tpl or '$' in tpl:
TEMPLATES[tplid] = adapter(source=tpl, lookup=lookup, **settings)
else:
TEMPLATES[tplid] = adapter(name=tpl, lookup=lookup, **settings)
if not TEMPLATES[tplid]:
abort(500, 'Template (%s) not found' % tpl)
return TEMPLATES[tplid].render(kwargs)
def urlencode(query, doseq=False, safe='', encoding=None, errors=None,
quote_via=quote_plus):
"""Encode a dict or sequence of two-element tuples into a URL query string.
If any values in the query arg are sequences and doseq is true, each
sequence element is converted to a separate parameter.
If the query arg is a sequence of two-element tuples, the order of the
parameters in the output will match the order of parameters in the
input.
The components of a query arg may each be either a string or a bytes type.
The safe, encoding, and errors parameters are passed down to the function
specified by quote_via (encoding and errors only if a component is a str).
"""
if hasattr(query, "items"):
query = query.items()
else:
# It's a bother at times that strings and string-like objects are
# sequences.
try:
# non-sequence items should not work with len()
# non-empty strings will fail this
if len(query) and not isinstance(query[0], tuple):
raise TypeError
# Zero-length sequences of all types will get here and succeed,
# but that's a minor nit. Since the original implementation
# allowed empty dicts that type of behavior probably should be
# preserved for consistency
except TypeError as err:
raise TypeError("not a valid non-string sequence "
"or mapping object") from err
l = []
if not doseq:
for k, v in query:
if isinstance(k, bytes):
k = quote_via(k, safe)
else:
k = quote_via(str(k), safe, encoding, errors)
if isinstance(v, bytes):
v = quote_via(v, safe)
else:
v = quote_via(str(v), safe, encoding, errors)
l.append(k + '=' + v)
else:
for k, v in query:
if isinstance(k, bytes):
k = quote_via(k, safe)
else:
k = quote_via(str(k), safe, encoding, errors)
if isinstance(v, bytes):
v = quote_via(v, safe)
l.append(k + '=' + v)
elif isinstance(v, str):
v = quote_via(v, safe, encoding, errors)
l.append(k + '=' + v)
else:
try:
# Is this a sufficient test for sequence-ness?
x = len(v)
except TypeError:
# not a sequence
v = quote_via(str(v), safe, encoding, errors)
l.append(k + '=' + v)
else:
# loop over the sequence
for elt in v:
if isinstance(elt, bytes):
elt = quote_via(elt, safe)
else:
elt = quote_via(str(elt), safe, encoding, errors)
l.append(k + '=' + elt)
return '&'.join(l)
def get_style():
"""
Returns the current user's preferred style path (to .css file), checking:
- URL ?css= param (for preview/testing)
- 'css' cookie
- First available style, or '/static/styles/base.css' if none found
This should be called by render_template for every page load.
"""
styles = list_styles()
style_cookie = gw.web.cookies.get("css") if gw.web.app.is_enabled('web.cookies') else None
style_query = request.query.get("css")
style_path = None
# Prefer query param (if exists and valid)
if style_query:
for src, fname in styles:
if fname == style_query:
style_path = f"/static/styles/{fname}" if src == "global" else f"/static/{src}/styles/{fname}"
break
# Otherwise, prefer cookie
if not style_path and style_cookie:
for src, fname in styles:
if fname == style_cookie:
style_path = f"/static/styles/{fname}" if src == "global" else f"/static/{src}/styles/{fname}"
break
# Otherwise, first available style
if not style_path and styles:
src, fname = styles[0]
style_path = f"/static/styles/{fname}" if src == "global" else f"/static/{src}/styles/{fname}"
# Fallback to base
return style_path or "/static/styles/base.css"
def view_style_switcher(*, css=None, project=None):
"""
Shows available styles (global + project), lets user choose, preview, and see raw CSS.
If cookies are accepted, sets the style via cookie when changed in dropdown.
If cookies are not accepted, only uses the css param for preview.
"""
import os
from bottle import request, response
# Determine the project from context or fallback if not provided
if not project:
path = request.fullpath.strip("/").split("/")
if path and path[0] in ("conway", "awg", "site", "etron"):
project = path[0]
else:
project = "site"
def list_styles_local(project):
seen = set()
styles = []
# Global styles
global_dir = gw.resource("data", "static", "styles")
if os.path.isdir(global_dir):
for f in sorted(os.listdir(global_dir)):
if f.endswith(".css") and os.path.isfile(os.path.join(global_dir, f)):
if f not in seen:
styles.append(("global", f))
seen.add(f)
if project:
proj_dir = gw.resource("data", "static", project, "styles")
if os.path.isdir(proj_dir):
for f in sorted(os.listdir(proj_dir)):
if f.endswith(".css") and os.path.isfile(os.path.join(proj_dir, f)):
if f not in seen:
styles.append((project, f))
seen.add(f)
return styles
styles = list_styles_local(project)
all_styles = [fname for _, fname in styles]
style_sources = {fname: src for src, fname in styles}
cookies_enabled = gw.web.app.is_enabled('web.cookies')
cookies_accepted = gw.web.cookies.check_consent() if cookies_enabled else False
css_cookie = gw.web.cookies.get("css")
# Handle POST
if request.method == "POST":
selected_style = request.forms.get("css")
if cookies_enabled and cookies_accepted and selected_style and selected_style in all_styles:
gw.web.cookies.set("css", selected_style)
response.status = 303
response.set_header("Location", request.fullpath)
return ""
# --- THIS IS THE MAIN LOGIC: ---
# Priority: query param > explicit function arg > cookie > default
style_query = request.query.get("css")
selected_style = (
style_query if style_query in all_styles else
(css if css in all_styles else
(css_cookie if css_cookie in all_styles else
(all_styles[0] if all_styles else "base.css")))
)
# If still not valid, fallback to default
if selected_style not in all_styles:
selected_style = all_styles[0] if all_styles else "base.css"
# Determine preview link and path for raw CSS
if style_sources.get(selected_style) == "global":
preview_href = f"/static/styles/{selected_style}"
css_path = gw.resource("data", "static", "styles", selected_style)
css_link = f'<link rel="stylesheet" href="/static/styles/{selected_style}">'
else:
preview_href = f"/static/{project}/styles/{selected_style}"
css_path = gw.resource("data", "static", project, "styles", selected_style)
css_link = f'<link rel="stylesheet" href="/static/{project}/styles/{selected_style}">'
preview_html = f"""
{css_link}
<div class="style-preview">
<h2>Theme Preview: {selected_style[:-4].replace('_', ' ').title()}</h2>
<p>This is a preview of the <b>{selected_style}</b> theme.</p>
<button>Sample button</button>
<pre>code block</pre>
</div>
"""
css_code = ""
try:
with open(css_path, encoding="utf-8") as f:
css_code = f.read()
except Exception:
css_code = "Could not load CSS file."
selector = style_selector_form(
all_styles=styles,
selected_style=selected_style,
cookies_enabled=cookies_enabled,
cookies_accepted=cookies_accepted,
project=project
)
return f"""
<h1>Select a Site Theme</h1>
{selector}
{preview_html}
<h3>CSS Source: {selected_style}</h3>
<pre style="max-height:400px;overflow:auto;">{html_escape(css_code)}</pre>
"""
def start_app(*,
host="[WEBSITE_HOST|127.0.0.1]",
port="[WEBSITE_PORT|8888]",
ws_port="[WEBSOCKET_PORT|9000]",
debug=False,
proxy=None,
app=None,
daemon=True,
threaded=True,
is_worker=False,
workers=None,
):
"""
Start an HTTP (WSGI) or ASGI server to host the given application.
- If `app` is a FastAPI instance, runs with Uvicorn (optionally on ws_port if set).
- If `app` is a WSGI app, uses Paste+ws4py or Bottle.
- If `app` is a zero-arg factory, it will be invoked (supporting sync or async factories).
- If `app` is a list of apps, each will be run in its own thread (each on an incremented port; FastAPI uses ws_port if set).
"""
import inspect
import asyncio
host = gw.resolve(host) if isinstance(host, str) else host
port = gw.resolve(port) if isinstance(port, str) else port
ws_port = gw.resolve(ws_port) if isinstance(ws_port, str) else ws_port
def run_server():
nonlocal app
all_apps = app if iterable(app) else (app, )
# ---- Multi-app mode ----
if not is_worker and len(all_apps) > 1:
from threading import Thread
from collections import Counter
threads = []
app_types = []
gw.info(f"Starting {len(all_apps)} apps in parallel threads.")
fastapi_count = 0
for i, sub_app in enumerate(all_apps):
try:
from fastapi import FastAPI
is_fastapi = isinstance(sub_app, FastAPI)
app_type = "FastAPI" if is_fastapi else type(sub_app).__name__
except ImportError:
is_fastapi = False
app_type = type(sub_app).__name__
# ---- Use ws_port for the first FastAPI app if provided, else increment port as before ----
if is_fastapi and ws_port and fastapi_count == 0:
port_i = int(ws_port)
fastapi_count += 1
else:
# Use base port + i, skipping ws_port if it's in the range
port_i = int(port) + i
# Prevent port collision if ws_port == port_i (rare but possible)
if ws_port and port_i == int(ws_port):
port_i += 1
gw.info(f" App {i+1}: type={app_type}, port={port_i}")
app_types.append(app_type)
t = Thread(
target=gw.web.server.start_app,
kwargs=dict(
host=host,
port=port_i,
ws_port=None, # Only outer thread assigns ws_port!
debug=debug,
proxy=proxy,
app=sub_app,
daemon=daemon,
threaded=threaded,
is_worker=True,
),
daemon=daemon,
)
t.start()
threads.append(t)
type_summary = Counter(app_types)
summary_str = ", ".join(f"{count}×{t}" for t, count in type_summary.items())
gw.info(f"All {len(all_apps)} apps started. Types: {summary_str}")
if not daemon:
for t in threads:
t.join()
return
# ---- Single-app mode ----
global _default_app_build_count
if not all_apps or all_apps == (None,):
_default_app_build_count += 1
if _default_app_build_count > 1:
gw.warning(
f"Default app is being built {_default_app_build_count} times! "
"This may indicate a misconfiguration or repeated server setup. "
"Check your recipe/config. Run with --app default to silence."
)
app = gw.web.app.setup(app=None)
else:
app = all_apps[0]
# Proxy setup (unchanged)
if proxy:
from .proxy import setup_app as setup_proxy
app = setup_proxy(endpoint=proxy, app=app)
# Factory support (unchanged)
if callable(app):
sig = inspect.signature(app)
if len(sig.parameters) == 0:
gw.info(f"Calling app factory: {app}")
maybe_app = app()
if inspect.isawaitable(maybe_app):
maybe_app = asyncio.get_event_loop().run_until_complete(maybe_app)
app = maybe_app
else:
gw.info(f"Detected callable WSGI/ASGI app: {app}")
# ---- Detect ASGI/FastAPI ----
try:
from fastapi import FastAPI
is_asgi = isinstance(app, FastAPI)
except ImportError:
is_asgi = False
if is_asgi:
# Use ws_port if provided, else use regular port
port_to_use = int(ws_port) if ws_port else int(port)
ws_url = f"ws://{host}:{port_to_use}"
gw.info(f"WebSocket support active @ {ws_url}/<path>?token=...")
try:
import uvicorn
except ImportError:
raise RuntimeError("uvicorn is required to serve ASGI apps. Please install uvicorn.")
uvicorn.run(
app,
host=host,
port=port_to_use,
log_level="debug" if debug else "info",
workers=workers or 1,
reload=debug,
)
return
# ---- WSGI fallback (unchanged) ----
from bottle import run as bottle_run, Bottle
try:
from paste import httpserver
except ImportError:
httpserver = None
try:
from ws4py.server.wsgiutils import WebSocketWSGIApplication
ws4py_available = True
except ImportError:
ws4py_available = False
if httpserver:
httpserver.serve(
app, host=host, port=int(port),
threadpool_workers=(workers or 5),
)
elif isinstance(app, Bottle):
bottle_run(
app,
host=host,
port=int(port),
debug=debug,
threaded=threaded,
)
else:
raise TypeError(f"Unsupported WSGI app type: {type(app)}")
if daemon:
return asyncio.to_thread(run_server)
else:
run_server()
def view_qr_code(*args, value=None, **kwargs):
"""Generate a QR code for a given value and serve it from cache if available."""
if not value:
return '''
<h1>QR Code Generator</h1>
<form method="post">
<input type="text" name="value" placeholder="Enter text or URL" required class="main" />
<button type="submit" class="submit">Generate QR</button>
</form>
'''
qr_url = gw.qr.generate_url(value)
back_link = gw.web.app_url("qr-code")
return f"""
<h1>QR Code for:</h1>
<h2><code>{value}</code></h2>
<img src="{qr_url}" alt="QR Code" class="qr" />
<p><a href="{back_link}">Generate another</a></p>
"""
def collect(*, css="global", js="global", root="data/static", target="work/shared"):
enabled = getattr(gw.web.app, "enabled_projects", lambda: set())()
static_root = gw.resource(root)
def find_files(kind, proj):
found = []
seen = set()
parts = proj.split('.')
# Recursively walk project path
if parts:
proj_path = os.path.join(static_root, *parts)
for rootdir, dirs, files in os.walk(proj_path):
rel_root = os.path.relpath(rootdir, static_root)
for fname in files:
if kind == "css" and fname.endswith(".css"):
rel = os.path.join(rel_root, fname)
elif kind == "js" and fname.endswith(".js"):
rel = os.path.join(rel_root, fname)
else:
continue
if rel not in seen:
seen.add(rel)
found.append((proj, rel, os.path.join(rootdir, fname)))
# Ancestors, only direct files
for i in range(len(parts)-1, -1, -1):
ancestor_path = os.path.join(static_root, *parts[:i])
if not os.path.isdir(ancestor_path):
continue
rel_ancestor = os.path.relpath(ancestor_path, static_root)
for fname in os.listdir(ancestor_path):
fpath = os.path.join(ancestor_path, fname)
if not os.path.isfile(fpath):
continue
if kind == "css" and fname.endswith(".css"):
rel = os.path.join(rel_ancestor, fname) if rel_ancestor != "." else fname
elif kind == "js" and fname.endswith(".js"):
rel = os.path.join(rel_ancestor, fname) if rel_ancestor != "." else fname
else:
continue
if rel not in seen:
seen.add(rel)
found.append((proj, rel, fpath))
return found
report = {"css": [], "js": []}
# --- Collect CSS ---
if css:
all_css = []
for proj in enabled:
all_css.extend(find_files("css", proj))
seen_css = set()
for entry in all_css:
if entry[1] not in seen_css:
seen_css.add(entry[1])
report["css"].append(entry)
if isinstance(css, str):
bundle_path = gw.resource(target, f"{css}.css")
with open(bundle_path, "w", encoding="utf-8") as out:
for proj, rel, full in report["css"]:
with open(full, "r", encoding="utf-8") as f:
out.write(f"/* --- {proj}:{rel} --- */\n")
out.write(f.read() + "\n\n")
report["css_bundle"] = bundle_path
# --- Collect JS ---
if js:
all_js = []
for proj in enabled:
all_js.extend(find_files("js", proj))
seen_js = set()
for entry in all_js:
if entry[1] not in seen_js:
seen_js.add(entry[1])
report["js"].append(entry)
if isinstance(js, str):
bundle_path = gw.resource(target, f"{js}.js")
with open(bundle_path, "w", encoding="utf-8") as out:
for proj, rel, full in report["js"]:
with open(full, "r", encoding="utf-8") as f:
out.write(f"// --- {proj}:{rel} ---\n")
out.write(f.read() + "\n\n")
report["js_bundle"] = bundle_path
return report