Copy to clipboard async def simulate_cp(
cp_idx,
host,
ws_port,
rfid,
cp_path,
duration,
session_count
):
"""
Simulate a single CP session (possibly many times if session_count>1).
"""
cp_name = cp_path if session_count == 1 else f"{cp_path}{cp_idx+1}"
uri = f"ws://{host}:{ws_port}/{cp_name}"
try:
async with websockets.connect(uri, subprotocols=["ocpp1.6"]) as ws:
print(f"[Simulator:{cp_name}] Connected to {uri}")
async def listen_to_csms(stop_event):
try:
while True:
raw = await ws.recv()
print(f"[Simulator:{cp_name} ← CSMS] {raw}")
try:
msg = json.loads(raw)
except json.JSONDecodeError:
print(f"[Simulator:{cp_name}] Warning: Received non-JSON message")
continue
if isinstance(msg, list) and msg[0] == 2:
msg_id, action, payload = msg[1], msg[2], (msg[3] if len(msg) > 3 else {})
await ws.send(json.dumps([3, msg_id, {}]))
if action == "RemoteStopTransaction":
print(f"[Simulator:{cp_name}] Received RemoteStopTransaction → stopping transaction")
stop_event.set()
else:
print(f"[Simulator:{cp_name}] Notice: Unexpected message format", msg)
except websockets.ConnectionClosed:
print(f"[Simulator:{cp_name}] Connection closed by server")
stop_event.set()
loop_count = 0
while loop_count < session_count:
stop_event = asyncio.Event()
# Start listener for this session
listener = asyncio.create_task(listen_to_csms(stop_event))
# Initial handshake
await ws.send(json.dumps([2, "boot", "BootNotification", {
"chargePointModel": "Simulator",
"chargePointVendor": "SimVendor"
}]))
await ws.recv()
await ws.send(json.dumps([2, "auth", "Authorize", {"idTag": rfid}]))
await ws.recv()
# StartTransaction
meter_start = random.randint(1000, 2000)
await ws.send(json.dumps([2, "start", "StartTransaction", {
"connectorId": 1,
"idTag": rfid,
"meterStart": meter_start
}]))
resp = await ws.recv()
tx_id = json.loads(resp)[2].get("transactionId")
print(f"[Simulator:{cp_name}] Transaction {tx_id} started at meter {meter_start}")
# MeterValues loop
actual_duration = random.uniform(duration * 0.75, duration * 1.25)
interval = actual_duration / 10
meter = meter_start
for _ in range(10):
if stop_event.is_set():
print(f"[Simulator:{cp_name}] Stop event triggered—ending meter loop")
break
meter += random.randint(50, 150)
meter_kwh = meter / 1000.0
await ws.send(json.dumps([2, "meter", "MeterValues", {
"connectorId": 1,
"transactionId": tx_id,
"meterValue": [{
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S') + "Z",
"sampledValue": [{
"value": f"{meter_kwh:.3f}",
"measurand": "Energy.Active.Import.Register",
"unit": "kWh",
"context": "Sample.Periodic"
}]
}]
}]))
await asyncio.sleep(interval)
# StopTransaction
await ws.send(json.dumps([2, "stop", "StopTransaction", {
"transactionId": tx_id,
"idTag": rfid,
"meterStop": meter
}]))
await ws.recv()
print(f"[Simulator:{cp_name}] Transaction {tx_id} stopped at meter {meter}")
# Idle phase: send heartbeat and idle meter value
idle_time = 20 if session_count == 1 else 60
idle_counter = 0
next_meter = meter
last_meter_value = time.monotonic()
start_idle = time.monotonic()
while (time.monotonic() - start_idle) < idle_time and not stop_event.is_set():
await ws.send(json.dumps([2, "hb", "Heartbeat", {}]))
await asyncio.sleep(5)
idle_counter += 5
if time.monotonic() - last_meter_value >= 30:
next_meter += random.randint(0, 2)
next_meter_kwh = next_meter / 1000.0
await ws.send(json.dumps([2, "meter", "MeterValues", {
"connectorId": 1,
"meterValue": [{
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S') + "Z",
"sampledValue": [{
"value": f"{next_meter_kwh:.3f}",
"measurand": "Energy.Active.Import.Register",
"unit": "kWh",
"context": "Sample.Clock"
}]
}]
}]))
last_meter_value = time.monotonic()
print(f"[Simulator:{cp_name}] Idle MeterValues sent.")
listener.cancel()
try:
await listener
except asyncio.CancelledError:
pass
loop_count += 1
if session_count == float('inf'):
continue # loop forever
print(f"[Simulator:{cp_name}] Simulation ended.")
except Exception as e:
print(f"[Simulator:{cp_name}] Exception: {e}")