Help for ocpp.evcs.simulate

Sample CLI

gway ocpp.evcs simulate

References

Full Code

def simulate(
    *,
    host: str = "[WEBSITE_HOST|127.0.0.1]",
    ws_port: int = "[WEBSOCKET_PORT|9000]",
    rfid: str = "FFFFFFFF",
    cp_path: str = "CPX",
    duration: int = 60,
    repeat=False,
    threads: int = None,
    daemon: bool = True,
):
    """
    Flexible OCPP charger simulator.
    - daemon=False: blocking, always returns after all runs.
    - daemon=True: returns a coroutine for orchestration, user is responsible for awaiting/cancelling.
    - threads: None/1 for one session; >1 to simulate multiple charge points.
    """
    host    = gw.resolve(host)
    ws_port = int(gw.resolve(ws_port))
    session_count = parse_repeat(repeat)
    n_threads = int(threads) if threads else 1

    async def orchestrate_all():
        stop_flags = [threading.Event() for _ in range(n_threads)]
        tasks = []
        threads_list = []

        async def run_task(idx):
            try:
                this_cp_path = _unique_cp_path(cp_path, idx, n_threads)
                await simulate_cp(
                    idx,
                    host,
                    ws_port,
                    rfid,
                    this_cp_path,
                    duration,
                    session_count
                )
            except Exception as e:
                print(f"[Simulator:coroutine:{idx}] Exception: {e}")

        def run_thread(idx, stop_flag):
            try:
                this_cp_path = _unique_cp_path(cp_path, idx, n_threads)
                asyncio.run(simulate_cp(
                    idx,
                    host,
                    ws_port,
                    rfid,
                    this_cp_path,
                    duration,
                    session_count
                ))
            except Exception as e:
                print(f"[Simulator:thread:{idx}] Exception: {e}")

        if n_threads == 1:
            tasks.append(asyncio.create_task(run_task(0)))
            try:
                await asyncio.gather(*tasks)
            except asyncio.CancelledError:
                print("[Simulator] Orchestration cancelled. Cancelling task(s)...")
                for t in tasks:
                    t.cancel()
                raise
        else:
            for idx in range(n_threads):
                t = threading.Thread(target=run_thread, args=(idx, stop_flags[idx]), daemon=True)
                t.start()
                threads_list.append(t)
            try:
                while any(t.is_alive() for t in threads_list):
                    await asyncio.sleep(0.5)
            except asyncio.CancelledError:
                gw.abort("[Simulator] Orchestration cancelled.")
            for t in threads_list:
                t.join()

    if daemon:
        return orchestrate_all()
    else:
        if n_threads == 1:
            asyncio.run(simulate_cp(0, host, ws_port, rfid, cp_path, duration, session_count))
        else:
            threads_list = []
            for idx in range(n_threads):
                this_cp_path = _unique_cp_path(cp_path, idx, n_threads)
                t = threading.Thread(target=_thread_runner, args=(
                    simulate_cp, idx, host, ws_port, rfid, this_cp_path, duration, session_count
                ), daemon=True)
                t.start()
                threads_list.append(t)
            for t in threads_list:
                t.join()