Моніторинг Fossibot, Aferiy через RS485.

Зарядні станції компанії SYD
Відповісти
bootuse
Site Admin
Повідомлень: 12
З нами з: 25 лютого 2026, 13:37

Моніторинг Fossibot, Aferiy через RS485.

Повідомлення bootuse »

Зарядні станції Fossibot, Aferiy використовують для обміну даними між компонентами станцій RS485.
Але протокол пропрієнтарний, то ж регістри не зівпадають з відомими протоколами.

Коротка програма на Python для моніторингу цих станцій на ПК.
Показує SOC, напругу на елементах, напругу на батареї, струм, різницю в напругах на елементах, температуру батареї.
Не підходить для F1800 - там інший виробник самої станції.
Тестування робив на Aferiy P180.
УВАГА! Потрібен RS485-USB адаптер і в програмі вказати правильний порт адаптера.
RS485-USB - https://s.click.aliexpress.com/e/_c4KYqcVz
Крім А та В ліній бажано підключити GND і не робіть довгими проводи.

І встановити компонент pyserial -
pip install pyserial
Текст програми.

Код: Виділити все

import serial
import time
import sys

# --- НАЛАШТУВАННЯ ---
PORT = 'COM2'
BAUD = 9600
BMS_MARKER = b'\x11\x04\x00\x00\x00\x3C'

# Налаштування фільтрації
HISTORY_SIZE = 10  # Кількість пакетів для аналізу (при оновленні 0.5с це ~5 секунд)
cells_history = [[] for _ in range(16)]

def find_cells_in_packet(packet):
    """Шукає блок із 16 напруг у пакеті"""
    for start_pos in range(len(packet) - 32):
        potential_cells = []
        is_valid = True
        for i in range(16):
            pos = start_pos + (i * 2)
            val = ((packet[pos] << 8) | packet[pos+1]) / 1000.0
            if not (2.8 < val < 3.8): # Рамки для LiFePO4 під навантаженням
                is_valid = False
                break
            potential_cells.append(val)
        if is_valid:
            return potential_cells, start_pos
    return None, None

def get_median_cells(new_cells):
    """Додає нові дані в історію та повертає медіанне (стабільне) значення"""
    global cells_history
    stable_cells = []
    for i in range(16):
        cells_history[i].append(new_cells[i])
        if len(cells_history[i]) > HISTORY_SIZE:
            cells_history[i].pop(0)
        
        # Сортуємо історію і беремо центральний елемент
        sorted_h = sorted(cells_history[i])
        median_val = sorted_h[len(sorted_h) // 2]
        stable_cells.append(median_val)
    return stable_cells

def parse_bms_data(packet):
    raw_cells, offset = find_cells_in_packet(packet)
    if not raw_cells:
        return

    # Отримуємо стабільні дані через фільтр
    cells = get_median_cells(raw_cells)

    try:
        # Основні параметри
        v_total = ((packet[42] << 8) | packet[43]) / 100.0
        curr_raw = (packet[44] << 8) | packet[45]
        current = (curr_raw if curr_raw < 32768 else curr_raw - 65536) / 100.0
        temp = packet[46]
        soc = packet[92]
        
        v_max, v_min = max(cells), min(cells)
        delta = v_max - v_min
        ts = time.strftime('%H:%M:%S')

        # Формуємо вивід
        print(f"\n[{ts}] ЗАРЯД: {soc}% | ТЕМП: {temp}°C | НАПРУГА: {v_total:.2f}V")
        print(f"СТРУМ: {current:.2f}A | ПОТУЖНІСТЬ: {abs(v_total * current):.1f}W")
        print("-" * 70)
        
        # Вивід комірок (2 рядки по 8)
        print(f"C01-08: {' | '.join([f'{c:.3f}' for c in cells[:8]])}")
        print(f"C09-16: {' | '.join([f'{c:.3f}' for c in cells[8:]])}")
        
        # Аналіз дельти
        status = f"СТАБІЛЬНА DELTA: {delta:.3f}V"
        if delta > 0.050: status += " [!] ПОТРЕБУЄ БАЛАНСУВАННЯ"
        print(status)
        print("=" * 70)

    except Exception as e:
        pass

def main():
    ser = None
    try:
        ser = serial.Serial(PORT, BAUD, timeout=0.01)
        print(f"--- МОНІТОРИНГ З МЕДІАННИМ ФІЛЬТРОМ ({PORT}) ---")
        print("Очікування стабілізації даних (5 сек)...")
        
        buffer = b''
        while True:
            if ser.in_waiting > 0:
                buffer += ser.read(ser.in_waiting)
                
                if BMS_MARKER in buffer:
                    idx = buffer.find(BMS_MARKER)
                    if len(buffer) >= idx + 128:
                        parse_bms_data(buffer[idx : idx + 128])
                        buffer = buffer[idx + 120:]
                
                if len(buffer) > 1000:
                    buffer = buffer[-200:]
            
            time.sleep(0.5)

    except KeyboardInterrupt:
        print("\n[!] Моніторинг зупинено.")
    except Exception as e:
        print(f"\n[!] Помилка: {e}")
    finally:
        if ser and ser.is_open:
            ser.close()
            print("[+] Порт закритий.")

if __name__ == "__main__":
    main()
Вкладення
COM2 Monitoring Session dump.zip
Дамп Serial Port Monitor
(41.8 Кіб) Завантажено 5 разів
Відповісти