project_inertial-control/lidar_module/lidar_reader.py

111 lines
3.3 KiB
Python

import serial
import time
import sqlite3
import struct
import signal
import os
# Константы
SERIAL_PORT = '/dev/ttyUSB0'
BAUD_RATE = 115200
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'inertial_data.db')
# Заголовок пакета LDS02RR
HEADER = 0xFA
PACKET_SIZE = 22
# Глобальные переменные для завершения
running = True
conn = None
ser = None
def shutdown(signum, frame):
"""Обработчик сигнала завершения"""
global running
print("Завершение лидар модуля...")
running = False
# Перехватываем сигналы завершения
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
def init_db():
"""Создать таблицу если не существует и очистить"""
conn = sqlite3.connect(DB_PATH)
conn.execute('''CREATE TABLE IF NOT EXISTS lidar_data (
timestamp REAL,
angle REAL,
distance_mm REAL,
quality INTEGER
)''')
conn.execute('DELETE FROM lidar_data')
conn.commit()
return conn
def parse_packet(data):
"""Парсить один пакет данных лидара"""
if len(data) < PACKET_SIZE or data[0] != HEADER:
return None
index = data[1] - 0xA0
if index < 0 or index > 89:
return None
points = []
for i in range(4):
offset = 4 + i * 4
b0 = data[offset]
b1 = data[offset + 1]
b2 = data[offset + 2]
b3 = data[offset + 3]
invalid = (b1 >> 7) & 1
distance = b0 | ((b1 & 0x3F) << 8)
quality = b2 | (b3 << 8)
angle = index * 4 + i
if not invalid and distance > 0:
points.append((angle, distance, quality))
return points
def save_to_db():
"""Основной цикл записи данных в БД"""
global conn, ser, running
conn = init_db()
try:
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
print("Подключено к лидару")
except Exception as e:
print(f"Ошибка подключения: {e}")
conn.close()
return
print("Запись данных лидара...")
buffer = bytearray()
start_time = time.time()
try:
while running:
data = ser.read(PACKET_SIZE)
if not data:
continue
buffer.extend(data)
while len(buffer) >= PACKET_SIZE:
if buffer[0] == HEADER:
packet = bytes(buffer[:PACKET_SIZE])
points = parse_packet(packet)
if points:
t = round(time.time() - start_time, 3)
for angle, distance, quality in points:
conn.execute(
'INSERT INTO lidar_data VALUES (?, ?, ?, ?)',
(t, angle, round(distance, 1), quality)
)
conn.commit()
buffer = buffer[PACKET_SIZE:]
else:
buffer.pop(0)
finally:
ser.close()
conn.close()
print("Лидар модуль остановлен")
save_to_db()