def setup_app(*,
app=None,
allowlist=None,
denylist=None,
location=None,
authorize=authorize_balance,
email=None,
):
global _transactions, _active_cons, _abnormal_status
email = email if isinstance(email, str) else gw.resolve('[ADMIN_EMAIL]')
oapp = app
from fastapi import FastAPI as _FastAPI
if (_is_new_app := not (app := gw.unwrap_one(app, _FastAPI))):
app = _FastAPI()
validator = None
if isinstance(authorize, str):
validator = gw[authorize]
elif callable(authorize):
validator = authorize
def is_authorized_rfid(rfid: str) -> bool:
if denylist and gw.cdv.validate(denylist, rfid):
gw.info(f"[OCPP] RFID {rfid!r} is present in denylist. Authorization denied.")
return False
if not allowlist:
gw.warn("[OCPP] No RFID allowlist configured โ rejecting all authorization requests.")
return False
return gw.cdv.validate(allowlist, rfid, validator=validator)
@app.websocket("/{path:path}")
async def websocket_ocpp(websocket: WebSocket, path: str):
global _csms_loop, _abnormal_status
_csms_loop = asyncio.get_running_loop()
charger_id = path.strip("/").split("/")[-1]
gw.info(f"[OCPP] WebSocket connected: charger_id={charger_id}")
protos = websocket.headers.get("sec-websocket-protocol", "").split(",")
protos = [p.strip() for p in protos if p.strip()]
if "ocpp1.6" in protos:
await websocket.accept(subprotocol="ocpp1.6")
else:
await websocket.accept()
_active_cons[charger_id] = websocket
try:
while True:
raw = await websocket.receive_text()
gw.info(f"[OCPP:{charger_id}] โ {raw}")
try:
msg = json.loads(raw)
except json.JSONDecodeError:
gw.warn(f"[OCPP:{charger_id}] Received non-JSON message: {raw!r}")
continue
if isinstance(msg, list) and msg[0] == 2:
message_id, action = msg[1], msg[2]
payload = msg[3] if len(msg) > 3 else {}
gw.debug(f"[OCPP:{charger_id}] Action={action} Payload={payload}")
response_payload = {}
if action == "Authorize":
status = "Accepted" if is_authorized_rfid(payload.get("idTag")) else "Rejected"
response_payload = {"idTagInfo": {"status": status}}
elif action == "BootNotification":
response_payload = {
"currentTime": datetime.utcnow().isoformat() + "Z",
"interval": 300,
"status": "Accepted"
}
elif action == "Heartbeat":
response_payload = {"currentTime": datetime.utcnow().isoformat() + "Z"}
elif action == "StartTransaction":
now = int(time.time())
transaction_id = now
_transactions[charger_id] = {
"syncStart": 1,
"connectorId": payload.get("connectorId"),
"idTagStart": payload.get("idTag"),
"meterStart": payload.get("meterStart"),
"reservationId": payload.get("reservationId", -1),
"startTime": now,
"startTimeStr": datetime.utcfromtimestamp(now).isoformat() + "Z",
"startMs": int(time.time() * 1000) % 1000,
"transactionId": transaction_id,
"MeterValues": []
}
response_payload = {
"transactionId": transaction_id,
"idTagInfo": {"status": "Accepted"}
}
if email:
subject = f"OCPP: Charger {charger_id} STARTED transaction {transaction_id}"
body = (
f"Charging session started.\n"
f"Charger: {charger_id}\n"
f"idTag: {payload.get('idTag')}\n"
f"Connector: {payload.get('connectorId')}\n"
f"Start Time: {datetime.utcfromtimestamp(now).isoformat()}Z\n"
f"Transaction ID: {transaction_id}\n"
f"Meter Start: {payload.get('meterStart')}\n"
f"Reservation ID: {payload.get('reservationId', -1)}"
)
gw.mail.send(subject, body, to=email)
elif action == "MeterValues":
tx = _transactions.get(charger_id)
if tx:
for entry in payload.get("meterValue", []):
ts = entry.get("timestamp")
ts_epoch = (
int(datetime.fromisoformat(ts.rstrip("Z")).timestamp())
if ts else int(time.time())
)
sampled = []
for sv in entry.get("sampledValue", []):
val = sv.get("value")
unit = sv.get("unit", "")
measurand = sv.get("measurand", "")
try:
fval = float(val)
if unit == "Wh":
fval = fval / 1000.0
sampled.append({
"value": fval,
"unit": "kWh" if unit == "Wh" else unit,
"measurand": measurand,
"context": sv.get("context", ""),
})
except Exception:
continue
tx["MeterValues"].append({
"timestamp": ts_epoch,
"timestampStr": datetime.utcfromtimestamp(ts_epoch).isoformat() + "Z",
"timeMs": int(time.time() * 1000) % 1000,
"sampledValue": sampled,
})
response_payload = {}
elif action == "StopTransaction":
now = int(time.time())
tx = _transactions.get(charger_id)
if tx:
if tx.get("MeterValues"):
try:
archive_e(charger_id, tx["transactionId"], tx["MeterValues"])
except Exception as e:
gw.error("Error recording energy chart.")
gw.exception(e)
tx.update({
"syncStop": 1,
"idTagStop": payload.get("idTag"),
"meterStop": payload.get("meterStop"),
"stopTime": now,
"stopTimeStr": datetime.utcfromtimestamp(now).isoformat() + "Z",
"stopMs": int(time.time() * 1000) % 1000,
"reason": 4,
"reasonStr": "Local",
})
if location:
file_path = gw.resource(
"work", "etron", "records", location,
f"{charger_id}_{tx['transactionId']}.dat"
)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as f:
json.dump(tx, f, indent=2)
response_payload = {"idTagInfo": {"status": "Accepted"}}
elif action == "StatusNotification":
status = payload.get("status")
error_code = payload.get("errorCode")
info = payload.get("info", "")
# Only store if abnormal; remove if cleared
if is_abnormal_status(status, error_code):
_abnormal_status[charger_id] = {
"status": status,
"errorCode": error_code,
"info": info,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
gw.warn(f"[OCPP] Abnormal status for {charger_id}: {status}/{error_code} - {info}")
else:
if charger_id in _abnormal_status:
gw.info(f"[OCPP] Status normalized for {charger_id}: {status}/{error_code}")
_abnormal_status.pop(charger_id, None)
response_payload = {}
else:
response_payload = {"status": "Accepted"}
response = [3, message_id, response_payload]
gw.info(f"[OCPP:{charger_id}] โ {action} => {response_payload}")
await websocket.send_text(json.dumps(response))
elif isinstance(msg, list) and msg[0] == 3:
# Handle CALLRESULT, check for Heartbeat ACK to record latest heartbeat time
payload = msg[2] if len(msg) > 2 else {}
if isinstance(payload, dict) and "currentTime" in payload:
# Only update for Heartbeat (or any other call with currentTime)
_latest_heartbeat[charger_id] = payload["currentTime"]
gw.debug(f"[OCPP:{charger_id}] Updated latest heartbeat to {_latest_heartbeat[charger_id]}")
continue
elif isinstance(msg, list) and msg[0] == 4:
gw.info(f"[OCPP:{charger_id}] Received CALLERROR: {msg}")
continue
else:
gw.warn(f"[OCPP:{charger_id}] Invalid or unsupported message format: {msg}")
except WebSocketDisconnect:
gw.info(f"[OCPP:{charger_id}] WebSocket disconnected")
except Exception as e:
gw.error(f"[OCPP:{charger_id}] WebSocket failure: {e}")
gw.debug(traceback.format_exc())
finally:
_active_cons.pop(charger_id, None)
return (app if not oapp else (oapp, app)) if _is_new_app else oapp