Help for ocpp.csms.power_consumed

Sample CLI

gway ocpp.csms power-consumed

Full Code

def power_consumed(tx):
    """Calculate power consumed in kWh from transaction's meter values (Energy.Active.Import.Register)."""
    if not tx:
        return 0.0

    # Try to use MeterValues if present and well-formed
    meter_values = tx.get("MeterValues", [])
    energy_vals = []
    for entry in meter_values:
        # entry should be a dict with sampledValue: [...]
        for sv in entry.get("sampledValue", []):
            if sv.get("measurand") == "Energy.Active.Import.Register":
                val = sv.get("value")
                # Parse value as float (from string), handle missing
                try:
                    val_f = float(val)
                    if sv.get("unit") == "Wh":
                        val_f = val_f / 1000.0
                    # else assume kWh
                    energy_vals.append(val_f)
                except Exception:
                    pass

    if energy_vals:
        start = energy_vals[0]
        end = energy_vals[-1]
        return round(end - start, 3)

    # Fallback to meterStart/meterStop if no sampled values
    meter_start = tx.get("meterStart")
    meter_stop = tx.get("meterStop")
    # handle int or float or None
    try:
        if meter_start is not None and meter_stop is not None:
            return round(float(meter_stop) / 1000.0 - float(meter_start) / 1000.0, 3)
        if meter_start is not None:
            return 0.0  # no consumption measured
    except Exception:
        pass

    return 0.0