Help for ocpp.csms.setup_app

Sample CLI

gway ocpp.csms setup-app

References

Full Code

def setup_app(*,
    app=None,
    allowlist=None,
    denylist=None,
    location=None,
    authorize=authorize_balance,
    email=None,
):
    global _transactions, _active_cons, _abnormal_status
    email = email if isinstance(email, str) else gw.resolve('[ADMIN_EMAIL]')

    oapp = app
    from fastapi import FastAPI as _FastAPI
    if (_is_new_app := not (app := gw.unwrap_one(app, _FastAPI))):
        app = _FastAPI()

    validator = None
    if isinstance(authorize, str):
        validator = gw[authorize]
    elif callable(authorize):
        validator = authorize

    def is_authorized_rfid(rfid: str) -> bool:
        if denylist and gw.cdv.validate(denylist, rfid):
            gw.info(f"[OCPP] RFID {rfid!r} is present in denylist. Authorization denied.")
            return False
        if not allowlist:
            gw.warn("[OCPP] No RFID allowlist configured โ€” rejecting all authorization requests.")
            return False
        return gw.cdv.validate(allowlist, rfid, validator=validator)

    @app.websocket("/{path:path}")
    async def websocket_ocpp(websocket: WebSocket, path: str):
        global _csms_loop, _abnormal_status
        _csms_loop = asyncio.get_running_loop()

        charger_id = path.strip("/").split("/")[-1]
        gw.info(f"[OCPP] WebSocket connected: charger_id={charger_id}")

        protos = websocket.headers.get("sec-websocket-protocol", "").split(",")
        protos = [p.strip() for p in protos if p.strip()]
        if "ocpp1.6" in protos:
            await websocket.accept(subprotocol="ocpp1.6")
        else:
            await websocket.accept()

        _active_cons[charger_id] = websocket

        try:
            while True:
                raw = await websocket.receive_text()
                gw.info(f"[OCPP:{charger_id}] โ†’ {raw}")
                try:
                    msg = json.loads(raw)
                except json.JSONDecodeError:
                    gw.warn(f"[OCPP:{charger_id}] Received non-JSON message: {raw!r}")
                    continue

                if isinstance(msg, list) and msg[0] == 2:
                    message_id, action = msg[1], msg[2]
                    payload = msg[3] if len(msg) > 3 else {}
                    gw.debug(f"[OCPP:{charger_id}] Action={action} Payload={payload}")

                    response_payload = {}

                    if action == "Authorize":
                        status = "Accepted" if is_authorized_rfid(payload.get("idTag")) else "Rejected"
                        response_payload = {"idTagInfo": {"status": status}}

                    elif action == "BootNotification":
                        response_payload = {
                            "currentTime": datetime.utcnow().isoformat() + "Z",
                            "interval": 300,
                            "status": "Accepted"
                        }

                    elif action == "Heartbeat":
                        response_payload = {"currentTime": datetime.utcnow().isoformat() + "Z"}

                    elif action == "StartTransaction":
                        now = int(time.time())
                        transaction_id = now
                        _transactions[charger_id] = {
                            "syncStart": 1,
                            "connectorId": payload.get("connectorId"),
                            "idTagStart": payload.get("idTag"),
                            "meterStart": payload.get("meterStart"),
                            "reservationId": payload.get("reservationId", -1),
                            "startTime": now,
                            "startTimeStr": datetime.utcfromtimestamp(now).isoformat() + "Z",
                            "startMs": int(time.time() * 1000) % 1000,
                            "transactionId": transaction_id,
                            "MeterValues": []
                        }
                        response_payload = {
                            "transactionId": transaction_id,
                            "idTagInfo": {"status": "Accepted"}
                        }

                        if email:
                            subject = f"OCPP: Charger {charger_id} STARTED transaction {transaction_id}"
                            body = (
                                f"Charging session started.\n"
                                f"Charger: {charger_id}\n"
                                f"idTag: {payload.get('idTag')}\n"
                                f"Connector: {payload.get('connectorId')}\n"
                                f"Start Time: {datetime.utcfromtimestamp(now).isoformat()}Z\n"
                                f"Transaction ID: {transaction_id}\n"
                                f"Meter Start: {payload.get('meterStart')}\n"
                                f"Reservation ID: {payload.get('reservationId', -1)}"
                            )
                            gw.mail.send(subject, body, to=email)

                    elif action == "MeterValues":
                        tx = _transactions.get(charger_id)
                        if tx:
                            for entry in payload.get("meterValue", []):
                                ts = entry.get("timestamp")
                                ts_epoch = (
                                    int(datetime.fromisoformat(ts.rstrip("Z")).timestamp())
                                    if ts else int(time.time())
                                )
                                sampled = []
                                for sv in entry.get("sampledValue", []):
                                    val = sv.get("value")
                                    unit = sv.get("unit", "")
                                    measurand = sv.get("measurand", "")
                                    try:
                                        fval = float(val)
                                        if unit == "Wh":
                                            fval = fval / 1000.0
                                        sampled.append({
                                            "value": fval,
                                            "unit": "kWh" if unit == "Wh" else unit,
                                            "measurand": measurand,
                                            "context": sv.get("context", ""),
                                        })
                                    except Exception:
                                        continue
                                tx["MeterValues"].append({
                                    "timestamp": ts_epoch,
                                    "timestampStr": datetime.utcfromtimestamp(ts_epoch).isoformat() + "Z",
                                    "timeMs": int(time.time() * 1000) % 1000,
                                    "sampledValue": sampled,
                                })
                        response_payload = {}

                    elif action == "StopTransaction":
                        now = int(time.time())
                        tx = _transactions.get(charger_id)
                        if tx:
                            if tx.get("MeterValues"):
                                try:
                                    archive_e(charger_id, tx["transactionId"], tx["MeterValues"])
                                except Exception as e:
                                    gw.error("Error recording energy chart.")
                                    gw.exception(e)
                            tx.update({
                                "syncStop": 1,
                                "idTagStop": payload.get("idTag"),
                                "meterStop": payload.get("meterStop"),
                                "stopTime": now,
                                "stopTimeStr": datetime.utcfromtimestamp(now).isoformat() + "Z",
                                "stopMs": int(time.time() * 1000) % 1000,
                                "reason": 4,
                                "reasonStr": "Local",
                            })
                            if location:
                                file_path = gw.resource(
                                    "work", "etron", "records", location,
                                    f"{charger_id}_{tx['transactionId']}.dat"
                                )
                                os.makedirs(os.path.dirname(file_path), exist_ok=True)
                                with open(file_path, "w") as f:
                                    json.dump(tx, f, indent=2)
                        response_payload = {"idTagInfo": {"status": "Accepted"}}

                    elif action == "StatusNotification":
                        status = payload.get("status")
                        error_code = payload.get("errorCode")
                        info = payload.get("info", "")
                        # Only store if abnormal; remove if cleared
                        if is_abnormal_status(status, error_code):
                            _abnormal_status[charger_id] = {
                                "status": status,
                                "errorCode": error_code,
                                "info": info,
                                "timestamp": datetime.utcnow().isoformat() + "Z"
                            }
                            gw.warn(f"[OCPP] Abnormal status for {charger_id}: {status}/{error_code} - {info}")
                        else:
                            if charger_id in _abnormal_status:
                                gw.info(f"[OCPP] Status normalized for {charger_id}: {status}/{error_code}")
                                _abnormal_status.pop(charger_id, None)
                        response_payload = {}

                    else:
                        response_payload = {"status": "Accepted"}

                    response = [3, message_id, response_payload]
                    gw.info(f"[OCPP:{charger_id}] โ† {action} => {response_payload}")
                    await websocket.send_text(json.dumps(response))

                elif isinstance(msg, list) and msg[0] == 3:
                    # Handle CALLRESULT, check for Heartbeat ACK to record latest heartbeat time
                    payload = msg[2] if len(msg) > 2 else {}
                    if isinstance(payload, dict) and "currentTime" in payload:
                        # Only update for Heartbeat (or any other call with currentTime)
                        _latest_heartbeat[charger_id] = payload["currentTime"]
                        gw.debug(f"[OCPP:{charger_id}] Updated latest heartbeat to {_latest_heartbeat[charger_id]}")
                    continue

                elif isinstance(msg, list) and msg[0] == 4:
                    gw.info(f"[OCPP:{charger_id}] Received CALLERROR: {msg}")
                    continue

                else:
                    gw.warn(f"[OCPP:{charger_id}] Invalid or unsupported message format: {msg}")

        except WebSocketDisconnect:
            gw.info(f"[OCPP:{charger_id}] WebSocket disconnected")
        except Exception as e:
            gw.error(f"[OCPP:{charger_id}] WebSocket failure: {e}")
            gw.debug(traceback.format_exc())
        finally:
            _active_cons.pop(charger_id, None)

    return (app if not oapp else (oapp, app)) if _is_new_app else oapp