87 lines
2.7 KiB
Python
87 lines
2.7 KiB
Python
|
|
import serial
|
||
|
|
import csv
|
||
|
|
import time
|
||
|
|
import struct
|
||
|
|
|
||
|
|
# Константы
|
||
|
|
SERIAL_PORT = '/dev/ttyS0' # UART порт на MangoPi
|
||
|
|
BAUD_RATE = 230400 # скорость передачи LDS02RR
|
||
|
|
DURATION = 5 # секунд записи
|
||
|
|
|
||
|
|
# Заголовок пакета LDS02RR
|
||
|
|
HEADER = 0xFA
|
||
|
|
PACKET_SIZE = 22 # размер пакета в байтах
|
||
|
|
|
||
|
|
def parse_packet(data):
|
||
|
|
"""Парсить один пакет данных лидара"""
|
||
|
|
if len(data) < PACKET_SIZE:
|
||
|
|
return None
|
||
|
|
|
||
|
|
if data[0] != HEADER:
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Индекс угла (0xA0 = 0°, 0xF9 = 359°)
|
||
|
|
index = data[1] - 0xA0
|
||
|
|
if index < 0 or index > 89:
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Скорость вращения
|
||
|
|
speed = struct.unpack_from('<H', data, 2)[0] / 64.0
|
||
|
|
|
||
|
|
points = []
|
||
|
|
for i in range(4):
|
||
|
|
offset = 4 + i * 4
|
||
|
|
distance_raw = struct.unpack_from('<H', data, offset)[0]
|
||
|
|
quality = struct.unpack_from('<H', data, offset + 2)[0] >> 8
|
||
|
|
|
||
|
|
distance = distance_raw / 4.0 # в мм
|
||
|
|
angle = index * 4 + i # угол в градусах
|
||
|
|
|
||
|
|
points.append((angle, distance, quality))
|
||
|
|
|
||
|
|
return points
|
||
|
|
|
||
|
|
def save_to_csv(filename, duration_seconds):
|
||
|
|
"""Читать данные лидара и записывать в CSV"""
|
||
|
|
try:
|
||
|
|
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
|
||
|
|
print(f"Подключено к {SERIAL_PORT}")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Ошибка подключения: {e}")
|
||
|
|
return
|
||
|
|
|
||
|
|
with open(filename, 'w', newline='') as f:
|
||
|
|
writer = csv.writer(f)
|
||
|
|
writer.writerow(['timestamp', 'angle', 'distance_mm', 'quality'])
|
||
|
|
|
||
|
|
start_time = time.time()
|
||
|
|
buffer = bytearray()
|
||
|
|
|
||
|
|
while time.time() - start_time < duration_seconds:
|
||
|
|
data = ser.read(PACKET_SIZE)
|
||
|
|
if not data:
|
||
|
|
continue
|
||
|
|
|
||
|
|
buffer.extend(data)
|
||
|
|
|
||
|
|
# Ищем заголовок пакета
|
||
|
|
while len(buffer) >= PACKET_SIZE:
|
||
|
|
if buffer[0] == HEADER:
|
||
|
|
packet = bytes(buffer[:PACKET_SIZE])
|
||
|
|
points = parse_packet(packet)
|
||
|
|
|
||
|
|
if points:
|
||
|
|
t = round(time.time() - start_time, 3)
|
||
|
|
for angle, distance, quality in points:
|
||
|
|
writer.writerow([t, angle,
|
||
|
|
round(distance, 1),
|
||
|
|
quality])
|
||
|
|
buffer = buffer[PACKET_SIZE:]
|
||
|
|
else:
|
||
|
|
buffer.pop(0)
|
||
|
|
|
||
|
|
ser.close()
|
||
|
|
print("Готово!")
|
||
|
|
|
||
|
|
# Запуск
|
||
|
|
save_to_csv('lidar.csv', DURATION)
|