Моніторинг Fossibot, Aferiy через RS485.

Зарядні станції компанії SYD
Відповісти
bootuse
Site Admin
Повідомлень: 22
З нами з: 25 лютого 2026, 13:37

Моніторинг Fossibot, Aferiy через RS485.

Повідомлення bootuse »

Зарядні станції Fossibot, Aferiy використовують для обміну даними між компонентами станцій RS485.
Але протокол пропрієнтарний, то ж регістри не зівпадають з відомими протоколами.

Коротка програма на Python для моніторингу цих станцій на ПК.
Показує SOC, напругу на елементах, напругу на батареї, струм, різницю в напругах на елементах, температуру батареї.
Не підходить для F1800 - там інший виробник самої станції.
Тестування робив на Aferiy P180.
УВАГА! Потрібен RS485-USB адаптер і в програмі вказати правильний порт адаптера.
RS485-USB - https://s.click.aliexpress.com/e/_c4KYqcVz
Крім А та В ліній бажано підключити GND і не робіть довгими проводи.

І встановити компонент pyserial -
pip install pyserial
Текст програми.

Код: Виділити все

import serial
import time

# --- НАЛАШТУВАННЯ ---
PORT = 'COM2'
BAUD = 9600
BMS_MARKER = b'\x11\x04\x00\x00' # Короткий маркер для надійності

# Налаштування фільтрації
HISTORY_SIZE = 7  # Глибина пам'яті (пакетів)
cells_history = [[] for _ in range(16)]

def find_cells_in_packet(packet):
    """Шукає блок напруг, ігноруючи пакети з FF FF"""
    # Зазвичай напруги починаються після заголовка (з 3-го або 6-го байту)
    # Шукаємо першу адекватну напругу в діапазоні 2.5V - 3.8V
    for start_pos in range(len(packet) - 32):
        potential_cells = []
        is_valid = True
        for i in range(16):
            pos = start_pos + (i * 2)
            raw_val = (packet[pos] << 8) | packet[pos+1]
            
            # Якщо бачимо FF FF (65535) або 0 - пакет "сміттєвий"
            if raw_val == 0xFFFF or raw_val == 0:
                is_valid = False
                break
                
            val = raw_val / 1000.0
            if not (2.5 < val < 4.0):
                is_valid = False
                break
            potential_cells.append(val)
            
        if is_valid:
            return potential_cells
    return None

def get_stable_cells(new_cells):
    """Медіанний фільтр: вибирає середнє значення з історії"""
    global cells_history
    stable = []
    for i in range(16):
        cells_history[i].append(new_cells[i])
        if len(cells_history[i]) > HISTORY_SIZE:
            cells_history[i].pop(0)
        
        # Сортуємо і беремо центральне значення (медіану)
        tmp_sorted = sorted(cells_history[i])
        stable.append(tmp_sorted[len(tmp_sorted)//2])
    return stable

def parse_bms_data(packet):
    raw_cells = find_cells_in_packet(packet)
    
    # Якщо в пакеті замість напруг FF FF - просто ігноруємо його
    if not raw_cells:
        return

    cells = get_stable_cells(raw_cells)

    try:
        # Парсимо загальні дані (адреси зміщені відносно початку пакета 11 04)
        v_total = ((packet[42] << 8) | packet[43]) / 100.0
        curr_raw = (packet[44] << 8) | packet[45]
        current = (curr_raw if curr_raw < 32768 else curr_raw - 65536) / 100.0
        temp = packet[46]
        soc = packet[92]
        
        v_max, v_min = max(cells), min(cells)
        delta = v_max - v_min
        ts = time.strftime('%H:%M:%S')

        # Вивід у консоль
        print(f"\n[{ts}] SOC: {soc}% | ТЕМП: {temp}°C | V_TOTAL: {v_total:.2f}V")
        print(f"СТРУМ: {current:.2f}A | ПОТУЖНІСТЬ: {abs(v_total * current):.1f}W")
        print("-" * 72)
        
        # Форматування комірок 2x8
        for row in range(2):
            line = ""
            for col in range(8):
                idx = row * 8 + col
                line += f"C{idx+1:02d}:{cells[idx]:.3f}V  "
            print(line.strip())
            
        print(f"REAL DELTA: {delta:.3f}V")
        print("=" * 72)

    except Exception:
        pass

def main():
    try:
        ser = serial.Serial(PORT, BAUD, timeout=0.05)
        print(f"--- АНАЛІЗАТОР AFERIY ЗАПУЩЕНО ({PORT}) ---")
        buffer = b''
        
        while True:
            if ser.in_waiting > 0:
                buffer += ser.read(ser.in_waiting)
                
                # Шукаємо початок пакета BMS
                if BMS_MARKER in buffer:
                    idx = buffer.find(BMS_MARKER)
                    # Пакет зазвичай має 120-128 байт
                    if len(buffer) >= idx + 124:
                        parse_bms_data(buffer[idx : idx + 124])
                        buffer = buffer[idx + 100:] # Зсуваємо буфер
                
                # Очищення старого буфера
                if len(buffer) > 2000:
                    buffer = buffer[-500:]
            
            time.sleep(0.1)
            
    except KeyboardInterrupt:
        print("\nЗупинка...")
    finally:
        if 'ser' in locals() and ser:
            ser.close()

if __name__ == "__main__":
    main()
Вкладення
COM2 Monitoring Session dump.zip
Дамп Serial Port Monitor
(41.8 Кіб) Завантажено 151 раз
Відповісти