def view_uploads(*, vbid: str = None, timeout: int = 60, files: int = 4, email: str = None, **kwargs):
"""
GET: Display upload interface or create a new upload box.
POST: Handle uploaded files to a specific vbid.
"""
global _gc_thread_on
if not _gc_thread_on:
threading.Thread(target=periodic_purge, daemon=True).start()
_gc_thread_on = True
admin_email = os.environ.get("ADMIN_EMAIL")
gw.info(f"Entry: vbid={vbid!r}, timeout={timeout}, files={files}, email={email!r}, method={request.method}")
# Handle file upload (POST) with a vbid (the classic file upload case)
if request.method == 'POST' and vbid:
gw.info(f"POST file upload for vbid={vbid}")
with _gc_lock:
expire = _open_boxes.get(vbid)
if not expire or expire < time.time():
gw.warning(f"vbox expired for vbid={vbid}")
return render_error("Upload Box Expired", "Please regenerate a new vbid.")
try:
short, _ = vbid.split(".", 1)
except ValueError:
gw.error(f"Invalid vbid format: {vbid}")
return render_error("Invalid vbid format", "Expected form: <code>short.long</code>.")
upload_dir = gw.resource(*VBOX_PATH, short)
os.makedirs(upload_dir, exist_ok=True)
uploaded_files = request.files.getlist("file")
results = []
for f in uploaded_files:
save_path = os.path.join(upload_dir, f.filename)
try:
f.save(save_path)
results.append(f"Uploaded {f.filename}")
gw.info(f"Uploaded {f.filename} to {short}")
except Exception as e:
results.append(f"Error uploading {f.filename}: {e}")
gw.error(f"Issue uploading {f.filename} to {short}")
gw.exception(e)
download_short_url = gw.web.app.build_url("download", vbid=short)
download_long_url = gw.web.app.build_url("download", vbid=vbid)
gw.info(f"Returning upload result UI for vbid={vbid}")
return (
"<pre>" + "\n".join(results) + "</pre>" +
f"<p><a href='?vbid={vbid}'>UPLOAD MORE files to this box</a></p>" +
f"<p><a href='{download_short_url}'>Go to PUBLIC READ-ONLY download page for this box</a></p>" +
f"<p><a href='{download_long_url}'>Go to HIDDEN WRITE download page for this box</a></p>"
)
if not vbid:
gw.info(f"No vbid present, always creating/checking box.")
remote_addr = request.remote_addr or ''
user_agent = request.headers.get('User-Agent') or ''
identity = remote_addr + user_agent
hash_digest = hashlib.sha256(identity.encode()).hexdigest()
short = hash_digest[:12]
full_id = f"{short}.{hash_digest[:40]}"
with _gc_lock:
now = time.time()
expires = _open_boxes.get(full_id)
if not expires or expires < now:
_open_boxes[full_id] = now + timeout * 60
os.makedirs(gw.resource(*VBOX_PATH, short), exist_ok=True)
url = gw.build_url("uploads", vbid=full_id)
message = f"[UPLOAD] Upload box created (expires in {timeout} min): {url}"
print(("-" * 70) + '\n' + message + '\n' + ("-" * 70))
gw.warning(message)
gw.info(f"Created new box: {full_id}")
else:
url = gw.build_url("upload", vbid=full_id)
gw.info(f"Existing box reused: {full_id}")
admin_notif = ""
sent_copy_msg = "<p>A copy of the access URL was sent to the admin.</p>"
if email:
if admin_email and email.lower() == admin_email.strip().lower():
subject = "Upload Box Link"
body = (
f"A new upload box was created.\n\n"
f"Access URL: {url}\n\n"
f"This box will expire in {timeout} minutes."
)
try:
gw.mail.send(subject, body=body, to=admin_email)
gw.info(f"Sent upload URL email to admin.")
except Exception as e:
gw.error(f"Error sending VBOX notification email: {e}")
admin_notif = sent_copy_msg
else:
admin_notif = sent_copy_msg
gw.info(f"Pretend email sent: {email!r} != {admin_email!r}")
# Show the ready box UI + the optional email form
email_form_html = (
"<form method='POST'>"
"<input type='email' name='email' required placeholder='Your email address'>"
"<button type='submit'>Request Link</button>"
"</form>"
)
form_message = (
"<p>If you are a site member, you may request a URL to be sent to your email by entering it here.</p>"
)
local_console_info = ""
if gw.web.server.is_local():
local_console_info = (
"<p>We've prepared an upload box for you. Check the console for the access URL.</p>"
"<p>To use it, go to <code>?vbid=…</code> and upload your files there.</p>"
)
return (
"<h1>Upload to Virtual Box</h1>"
f"{local_console_info}"
f"{admin_notif}"
f"{form_message if not email else ''}{email_form_html if not email else ''}"
)
# Validate and show upload UI for an existing vbid
gw.info(f"Render upload UI for vbid={vbid!r}")
with _gc_lock:
expire = _open_boxes.get(vbid)
if not expire or expire < time.time():
gw.warning(f"vbox expired for vbid={vbid}")
return render_error("Upload Box Expired or Not Found", "Please regenerate a new vbid.")
try:
short, _ = vbid.split(".", 1)
except ValueError:
gw.error(f"Invalid vbid format: {vbid}")
return render_error("Invalid vbid format", "Expected form: <code>short.long</code>.")
# Generate N file input fields
file_inputs = "\n".join(
f'<input type="file" name="file">' for _ in range(max(1, files))
)
download_url = gw.build_url("download", vbid=vbid)
gw.info(f"Displaying upload form for {short}")
return f"<h1>Upload to Box: {short}</h1>" + f"""
<form method="POST" enctype="multipart/form-data">
{file_inputs}
<br><p><button type="submit">Upload</button><p/>
</form>
<p>Files will be stored in <code>{'/'.join(VBOX_PATH)}/{short}/</code></p>
<p><a href="{download_url}">Go to download page for this box</a></p>
"""
def setup_fallback_app(*,
endpoint: str, app=None, websockets: bool = False, path: str = "/",
mode: str = "extend", callback=None,
):
"""
Create an HTTP (and optional WebSocket) fallback to the given endpoint.
This asumes the given endpoint will replicate or provide missing functionality
or the entire service if it can't be provided locally.
"""
# selectors for app types
from bottle import Bottle
# TODO: Implement a mode kwarg that defaults to "extend" and functions like this:
# replace: Replace all paths in the received apps with the proxied endpoint.
# extend: Redirect all paths not already configured to the proxy.
# errors: Catch errors thrown by the app and redirect the failed calls to the proxy.
# trigger: Use a callback function to check. Redirects when result is True.
# Move this explanation to the docstring.
# TODO: We need to use gw.unwrap_all instead and apply the proxy mode to each of the
# apps found there, then we need to return all those apps in a collection.
# collect apps by type
bottle_app = gw.unwrap_one(app, Bottle)
fastapi_app = gw.unwrap_one(app, FastAPI)
prepared = []
# if no matching apps, default to a new Bottle
if not bottle_app and not fastapi_app:
default = Bottle()
prepared.append(_wire_proxy(default, endpoint, websockets, path))
elif bottle_app:
prepared.append(_wire_proxy(bottle_app, endpoint, websockets, path))
elif fastapi_app:
prepared.append(_wire_proxy(fastapi_app, endpoint, websockets, path))
# TODO: Test that this return is properly compatible with web.server.start_app after the fixes
return prepared[0] if len(prepared) == 1 else tuple(prepared)
def is_local(request=None, host=None):
"""
Returns True if the active HTTP request originates from the same machine
that the server is running on (i.e., local request). Supports both
Bottle and FastAPI (ASGI/WSGI).
Args:
request: Optionally, the request object (Bottle, Starlette, or FastAPI Request).
host: Optionally, the bound host (for override or testing).
Returns:
bool: True if request is from localhost, else False.
"""
try:
# --- Try to infer the active request if not given ---
if request is None:
# Try FastAPI/Starlette
try:
from starlette.requests import Request as StarletteRequest
import contextvars
req_var = contextvars.ContextVar("request")
request = req_var.get()
except Exception:
pass
# Try Bottle global
if request is None:
try:
from bottle import request as bottle_request
request = bottle_request
except ImportError:
request = None
# --- Get remote address from request ---
remote_addr = None
if request is not None:
# FastAPI/Starlette: request.client.host
remote_addr = getattr(getattr(request, "client", None), "host", None)
# Bottle: request.remote_addr
if not remote_addr:
remote_addr = getattr(request, "remote_addr", None)
# Try request.environ['REMOTE_ADDR']
if not remote_addr and hasattr(request, "environ"):
remote_addr = request.environ.get("REMOTE_ADDR")
else:
# No request in context
return False
# --- Get server bound address ---
if host is None:
from gway import gw
host = gw.resolve("[WEBSITE_HOST|127.0.0.1]")
# If host is empty or all-interfaces, assume not local
if not host or host in ("0.0.0.0", "::", ""):
return False
# --- Normalize addresses for comparison ---
def _norm(addr):
if addr in ("localhost",):
return "127.0.0.1"
if addr in ("::1",):
return "127.0.0.1"
try:
# Try resolving hostname
return socket.gethostbyname(addr)
except Exception:
return addr
remote_ip = _norm(remote_addr)
host_ip = _norm(host)
# Accept both IPv4 and IPv6 loopback equivalence
return remote_ip.startswith("127.") or remote_ip == host_ip
except Exception as ex:
import traceback
print(f"[is_local] error: {ex}\n{traceback.format_exc()}")
return False
@set_module('numpy')
def iterable(y):
"""
Check whether or not an object can be iterated over.
Parameters
----------
y : object
Input object.
Returns
-------
b : bool
Return ``True`` if the object has an iterator method or is a
sequence and ``False`` otherwise.
Examples
--------
>>> import numpy as np
>>> np.iterable([1, 2, 3])
True
>>> np.iterable(2)
False
Notes
-----
In most cases, the results of ``np.iterable(obj)`` are consistent with
``isinstance(obj, collections.abc.Iterable)``. One notable exception is
the treatment of 0-dimensional arrays::
>>> from collections.abc import Iterable
>>> a = np.array(1.0) # 0-dimensional numpy array
>>> isinstance(a, Iterable)
True
>>> np.iterable(a)
False
"""
try:
iter(y)
except TypeError:
return False
return True
def start_app(*,
host="[WEBSITE_HOST|127.0.0.1]",
port="[WEBSITE_PORT|8888]",
ws_port="[WEBSOCKET_PORT|9000]",
debug=False,
proxy=None,
app=None,
daemon=True,
threaded=True,
is_worker=False,
workers=None,
):
"""
Start an HTTP (WSGI) or ASGI server to host the given application.
- If `app` is a FastAPI instance, runs with Uvicorn (optionally on ws_port if set).
- If `app` is a WSGI app, uses Paste+ws4py or Bottle.
- If `app` is a zero-arg factory, it will be invoked (supporting sync or async factories).
- If `app` is a list of apps, each will be run in its own thread (each on an incremented port; FastAPI uses ws_port if set).
"""
import inspect
import asyncio
host = gw.resolve(host) if isinstance(host, str) else host
port = gw.resolve(port) if isinstance(port, str) else port
ws_port = gw.resolve(ws_port) if isinstance(ws_port, str) else ws_port
def run_server():
nonlocal app
all_apps = app if iterable(app) else (app, )
# ---- Multi-app mode ----
if not is_worker and len(all_apps) > 1:
from threading import Thread
from collections import Counter
threads = []
app_types = []
gw.info(f"Starting {len(all_apps)} apps in parallel threads.")
fastapi_count = 0
for i, sub_app in enumerate(all_apps):
try:
from fastapi import FastAPI
is_fastapi = isinstance(sub_app, FastAPI)
app_type = "FastAPI" if is_fastapi else type(sub_app).__name__
except ImportError:
is_fastapi = False
app_type = type(sub_app).__name__
# ---- Use ws_port for the first FastAPI app if provided, else increment port as before ----
if is_fastapi and ws_port and fastapi_count == 0:
port_i = int(ws_port)
fastapi_count += 1
else:
# Use base port + i, skipping ws_port if it's in the range
port_i = int(port) + i
# Prevent port collision if ws_port == port_i (rare but possible)
if ws_port and port_i == int(ws_port):
port_i += 1
gw.info(f" App {i+1}: type={app_type}, port={port_i}")
app_types.append(app_type)
t = Thread(
target=gw.web.server.start_app,
kwargs=dict(
host=host,
port=port_i,
ws_port=None, # Only outer thread assigns ws_port!
debug=debug,
proxy=proxy,
app=sub_app,
daemon=daemon,
threaded=threaded,
is_worker=True,
),
daemon=daemon,
)
t.start()
threads.append(t)
type_summary = Counter(app_types)
summary_str = ", ".join(f"{count}×{t}" for t, count in type_summary.items())
gw.info(f"All {len(all_apps)} apps started. Types: {summary_str}")
if not daemon:
for t in threads:
t.join()
return
# ---- Single-app mode ----
global _default_app_build_count
if not all_apps or all_apps == (None,):
_default_app_build_count += 1
if _default_app_build_count > 1:
gw.warning(
f"Default app is being built {_default_app_build_count} times! "
"This may indicate a misconfiguration or repeated server setup. "
"Check your recipe/config. Run with --app default to silence."
)
app = gw.web.app.setup(app=None)
else:
app = all_apps[0]
# Proxy setup (unchanged)
if proxy:
from .proxy import setup_app as setup_proxy
app = setup_proxy(endpoint=proxy, app=app)
# Factory support (unchanged)
if callable(app):
sig = inspect.signature(app)
if len(sig.parameters) == 0:
gw.info(f"Calling app factory: {app}")
maybe_app = app()
if inspect.isawaitable(maybe_app):
maybe_app = asyncio.get_event_loop().run_until_complete(maybe_app)
app = maybe_app
else:
gw.info(f"Detected callable WSGI/ASGI app: {app}")
# ---- Detect ASGI/FastAPI ----
try:
from fastapi import FastAPI
is_asgi = isinstance(app, FastAPI)
except ImportError:
is_asgi = False
if is_asgi:
# Use ws_port if provided, else use regular port
port_to_use = int(ws_port) if ws_port else int(port)
ws_url = f"ws://{host}:{port_to_use}"
gw.info(f"WebSocket support active @ {ws_url}/<path>?token=...")
try:
import uvicorn
except ImportError:
raise RuntimeError("uvicorn is required to serve ASGI apps. Please install uvicorn.")
uvicorn.run(
app,
host=host,
port=port_to_use,
log_level="debug" if debug else "info",
workers=workers or 1,
reload=debug,
)
return
# ---- WSGI fallback (unchanged) ----
from bottle import run as bottle_run, Bottle
try:
from paste import httpserver
except ImportError:
httpserver = None
try:
from ws4py.server.wsgiutils import WebSocketWSGIApplication
ws4py_available = True
except ImportError:
ws4py_available = False
if httpserver:
httpserver.serve(
app, host=host, port=int(port),
threadpool_workers=(workers or 5),
)
elif isinstance(app, Bottle):
bottle_run(
app,
host=host,
port=int(port),
debug=debug,
threaded=threaded,
)
else:
raise TypeError(f"Unsupported WSGI app type: {type(app)}")
if daemon:
return asyncio.to_thread(run_server)
else:
run_server()