Help for ocpp.evcs.simulate_cp

Sample CLI

gway ocpp.evcs simulate-cp

Full Code

async def simulate_cp(
        cp_idx,
        host,
        ws_port,
        rfid,
        cp_path,
        duration,
        session_count
    ):
    """
    Simulate a single CP session (possibly many times if session_count>1).
    """
    cp_name = cp_path if session_count == 1 else f"{cp_path}{cp_idx+1}"
    uri     = f"ws://{host}:{ws_port}/{cp_name}"
    try:
        async with websockets.connect(uri, subprotocols=["ocpp1.6"]) as ws:
            print(f"[Simulator:{cp_name}] Connected to {uri}")

            async def listen_to_csms(stop_event):
                try:
                    while True:
                        raw = await ws.recv()
                        print(f"[Simulator:{cp_name} ← CSMS] {raw}")
                        try:
                            msg = json.loads(raw)
                        except json.JSONDecodeError:
                            print(f"[Simulator:{cp_name}] Warning: Received non-JSON message")
                            continue
                        if isinstance(msg, list) and msg[0] == 2:
                            msg_id, action, payload = msg[1], msg[2], (msg[3] if len(msg) > 3 else {})
                            await ws.send(json.dumps([3, msg_id, {}]))
                            if action == "RemoteStopTransaction":
                                print(f"[Simulator:{cp_name}] Received RemoteStopTransaction → stopping transaction")
                                stop_event.set()
                        else:
                            print(f"[Simulator:{cp_name}] Notice: Unexpected message format", msg)
                except websockets.ConnectionClosed:
                    print(f"[Simulator:{cp_name}] Connection closed by server")
                    stop_event.set()

            loop_count = 0
            while loop_count < session_count:
                stop_event = asyncio.Event()

                # Start listener for this session
                listener = asyncio.create_task(listen_to_csms(stop_event))

                # Initial handshake
                await ws.send(json.dumps([2, "boot", "BootNotification", {
                    "chargePointModel": "Simulator",
                    "chargePointVendor": "SimVendor"
                }]))
                await ws.recv()
                await ws.send(json.dumps([2, "auth", "Authorize", {"idTag": rfid}]))
                await ws.recv()

                # StartTransaction
                meter_start = random.randint(1000, 2000)
                await ws.send(json.dumps([2, "start", "StartTransaction", {
                    "connectorId": 1,
                    "idTag": rfid,
                    "meterStart": meter_start
                }]))
                resp = await ws.recv()
                tx_id = json.loads(resp)[2].get("transactionId")
                print(f"[Simulator:{cp_name}] Transaction {tx_id} started at meter {meter_start}")

                # MeterValues loop
                actual_duration = random.uniform(duration * 0.75, duration * 1.25)
                interval = actual_duration / 10
                meter = meter_start

                for _ in range(10):
                    if stop_event.is_set():
                        print(f"[Simulator:{cp_name}] Stop event triggered—ending meter loop")
                        break
                    meter += random.randint(50, 150)
                    meter_kwh = meter / 1000.0
                    await ws.send(json.dumps([2, "meter", "MeterValues", {
                        "connectorId": 1,
                        "transactionId": tx_id,
                        "meterValue": [{
                            "timestamp": time.strftime('%Y-%m-%dT%H:%M:%S') + "Z",
                            "sampledValue": [{
                                "value": f"{meter_kwh:.3f}",
                                "measurand": "Energy.Active.Import.Register",
                                "unit": "kWh",
                                "context": "Sample.Periodic"
                            }]
                        }]
                    }]))
                    await asyncio.sleep(interval)

                # StopTransaction
                await ws.send(json.dumps([2, "stop", "StopTransaction", {
                    "transactionId": tx_id,
                    "idTag": rfid,
                    "meterStop": meter
                }]))
                await ws.recv()
                print(f"[Simulator:{cp_name}] Transaction {tx_id} stopped at meter {meter}")

                # Idle phase: send heartbeat and idle meter value
                idle_time = 20 if session_count == 1 else 60
                idle_counter = 0
                next_meter = meter
                last_meter_value = time.monotonic()
                start_idle = time.monotonic()

                while (time.monotonic() - start_idle) < idle_time and not stop_event.is_set():
                    await ws.send(json.dumps([2, "hb", "Heartbeat", {}]))
                    await asyncio.sleep(5)
                    idle_counter += 5
                    if time.monotonic() - last_meter_value >= 30:
                        next_meter += random.randint(0, 2)
                        next_meter_kwh = next_meter / 1000.0
                        await ws.send(json.dumps([2, "meter", "MeterValues", {
                            "connectorId": 1,
                            "meterValue": [{
                                "timestamp": time.strftime('%Y-%m-%dT%H:%M:%S') + "Z",
                                "sampledValue": [{
                                    "value": f"{next_meter_kwh:.3f}",
                                    "measurand": "Energy.Active.Import.Register",
                                    "unit": "kWh",
                                    "context": "Sample.Clock"
                                }]
                            }]
                        }]))
                        last_meter_value = time.monotonic()
                        print(f"[Simulator:{cp_name}] Idle MeterValues sent.")

                listener.cancel()
                try:
                    await listener
                except asyncio.CancelledError:
                    pass

                loop_count += 1
                if session_count == float('inf'):
                    continue  # loop forever

            print(f"[Simulator:{cp_name}] Simulation ended.")
    except Exception as e:
        print(f"[Simulator:{cp_name}] Exception: {e}")