def simulate(
*,
host: str = "[WEBSITE_HOST|127.0.0.1]",
ws_port: int = "[WEBSOCKET_PORT|9000]",
rfid: str = "FFFFFFFF",
cp_path: str = "CPX",
duration: int = 60,
repeat=False,
threads: int = None,
daemon: bool = True,
):
"""
Flexible OCPP charger simulator.
- daemon=False: blocking, always returns after all runs.
- daemon=True: returns a coroutine for orchestration, user is responsible for awaiting/cancelling.
- threads: None/1 for one session; >1 to simulate multiple charge points.
"""
host = gw.resolve(host)
ws_port = int(gw.resolve(ws_port))
session_count = parse_repeat(repeat)
n_threads = int(threads) if threads else 1
async def orchestrate_all():
stop_flags = [threading.Event() for _ in range(n_threads)]
tasks = []
threads_list = []
async def run_task(idx):
try:
this_cp_path = _unique_cp_path(cp_path, idx, n_threads)
await simulate_cp(
idx,
host,
ws_port,
rfid,
this_cp_path,
duration,
session_count
)
except Exception as e:
print(f"[Simulator:coroutine:{idx}] Exception: {e}")
def run_thread(idx, stop_flag):
try:
this_cp_path = _unique_cp_path(cp_path, idx, n_threads)
asyncio.run(simulate_cp(
idx,
host,
ws_port,
rfid,
this_cp_path,
duration,
session_count
))
except Exception as e:
print(f"[Simulator:thread:{idx}] Exception: {e}")
if n_threads == 1:
tasks.append(asyncio.create_task(run_task(0)))
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
print("[Simulator] Orchestration cancelled. Cancelling task(s)...")
for t in tasks:
t.cancel()
raise
else:
for idx in range(n_threads):
t = threading.Thread(target=run_thread, args=(idx, stop_flags[idx]), daemon=True)
t.start()
threads_list.append(t)
try:
while any(t.is_alive() for t in threads_list):
await asyncio.sleep(0.5)
except asyncio.CancelledError:
gw.abort("[Simulator] Orchestration cancelled.")
for t in threads_list:
t.join()
if daemon:
return orchestrate_all()
else:
if n_threads == 1:
asyncio.run(simulate_cp(0, host, ws_port, rfid, cp_path, duration, session_count))
else:
threads_list = []
for idx in range(n_threads):
this_cp_path = _unique_cp_path(cp_path, idx, n_threads)
t = threading.Thread(target=_thread_runner, args=(
simulate_cp, idx, host, ws_port, rfid, this_cp_path, duration, session_count
), daemon=True)
t.start()
threads_list.append(t)
for t in threads_list:
t.join()