Sample CLI
gway ocpp.csms dispatch-action
Full Code
def dispatch_action(charger_id: str, action: str):
"""
Dispatch a remote admin action to the charger over OCPP via websocket.
"""
ws = _active_cons.get(charger_id)
if not ws:
raise HTTPError(404, "No active connection")
msg_id = str(uuid.uuid4())
# Compose and send the appropriate OCPP message for the requested action
if action == "remote_stop":
tx = _transactions.get(charger_id)
if not tx:
raise HTTPError(404, "No transaction to stop")
coro = ws.send_text(json.dumps([2, msg_id, "RemoteStopTransaction",
{"transactionId": tx["transactionId"]}]))
elif action.startswith("reset_"):
_, mode = action.split("_", 1)
coro = ws.send_text(json.dumps([2, msg_id, "Reset", {"type": mode.capitalize()}]))
elif action == "disconnect":
coro = ws.close(code=1000, reason="Admin disconnect")
else:
raise HTTPError(400, f"Unknown action: {action}")
if _csms_loop:
_csms_loop.call_soon_threadsafe(lambda: _csms_loop.create_task(coro))
else:
gw.warn("No CSMS event loop; action not sent")
return {"status": "requested", "messageId": msg_id}