Project
Function
Sample CLI
gway ocpp setup-csms-app
References
['abort', 'debug', 'error', 'info', 'resource', 'unwrap', 'warn', 'warning']
Full Code
def setup_csms_app(*, host='[OCPP_CSMS_HOST|0.0.0.0]', port='[OCPP_CSMS_PORT|9000]', app=None, allowlist=None):
"""
OCPP 1.6 CSMS implementation with RFID authorization.
Specify an allowlist file in .cdv format (RFID: [extra fields...])
"""
# A - This block ensures we find just the kind of app we need or create one if missing
oapp = app
if (_is_new_app := not (app := gw.unwrap(app, FastAPI))):
app = FastAPI()
def load_allowlist() -> dict[str, list[str]]:
if not allowlist:
return {}
try:
path = gw.resource(allowlist)
if not os.path.exists(path):
gw.error(f"[OCPP] Allowlist file not found: {path}")
return {}
result = {}
with open(path, "r") as f:
for lineno, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
parts = line.split(":")
rfid = parts[0].strip()
if len(rfid) == 8 and all(c in "0123456789ABCDEFabcdef" for c in rfid):
extra_fields = [part.strip() for part in parts[1:]]
result[rfid] = extra_fields
else:
gw.warn(f"[OCPP] Invalid RFID at line {lineno} in allowlist: '{line}'")
return result
except Exception as e:
gw.abort(f"[OCPP] Failed to read allowlist '{allowlist}': {e}")
def is_authorized_rfid(rfid: str) -> bool:
if not allowlist:
return True
return rfid in load_allowlist()
@app.websocket("/{path:path}")
async def websocket_ocpp(websocket: WebSocket, path: str):
charger_id = path.strip("/").split("/")[-1]
gw.info(f"[OCPP] New WebSocket connection at /{path} (charger_id={charger_id})")
try:
await websocket.accept()
_active_cons[charger_id] = websocket
while True:
raw = await websocket.receive_text()
gw.info(f"[OCPP:{charger_id}] Message received: {raw}")
try:
msg = json.loads(raw)
if isinstance(msg, list) and len(msg) >= 3 and msg[0] == 2:
message_id = msg[1]
action = msg[2]
payload = msg[3] if len(msg) > 3 else {}
gw.info(f"[OCPP:{charger_id}] -> Action: {action} | Payload: {payload}")
if action == "Authorize":
id_tag = payload.get("idTag")
accepted = is_authorized_rfid(id_tag)
status = "Accepted" if accepted else "Rejected"
response = [3, message_id, {"idTagInfo": {"status": status}}]
gw.info(f"[OCPP:{charger_id}] <- Authorize: {id_tag} -> {status}")
else:
response = [3, message_id, {"status": "Accepted"}]
gw.info(f"[OCPP:{charger_id}] <- Acknowledged {action}: {response}")
await websocket.send_text(json.dumps(response))
else:
gw.warning(f"[OCPP:{charger_id}] Received non-Call message or malformed")
except Exception as e:
gw.error(f"[OCPP:{charger_id}] Error parsing message: {e}")
gw.debug(traceback.format_exc())
except WebSocketDisconnect:
gw.info(f"[OCPP:{charger_id}] Disconnected")
except Exception as e:
gw.error(f"[OCPP:{charger_id}] WebSocket error: {e}")
gw.debug(traceback.format_exc())
finally:
_active_cons.pop(charger_id, None)
if allowlist:
_ = load_allowlist() # Validates on startup
gw.debug("Allowlist loaded without errors.")
gw.info(f"Setup OCPP 1.6 auth sink on {host}:{port} (allowlist={allowlist})")
# B- This return pattern ensures we include our app in the bundle (if any)
return (app if not oapp else (oapp, app)) if _is_new_app else oapp