import serial import time import sqlite3 import struct import signal import os # Константы SERIAL_PORT = '/dev/ttyUSB0' BAUD_RATE = 57600 DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'inertial_data.db') # Заголовок пакета LDS02RR HEADER = 0xFA PACKET_SIZE = 22 # Глобальные переменные для завершения running = True conn = None ser = None def shutdown(signum, frame): """Обработчик сигнала завершения""" global running print("Завершение лидар модуля...") running = False # Перехватываем сигналы завершения signal.signal(signal.SIGTERM, shutdown) signal.signal(signal.SIGINT, shutdown) def init_db(): """Создать таблицу если не существует и очистить""" conn = sqlite3.connect(DB_PATH) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") conn.execute('''CREATE TABLE IF NOT EXISTS lidar_data ( timestamp REAL, angle REAL, distance_mm REAL, quality INTEGER )''') conn.execute('DELETE FROM lidar_data') conn.commit() return conn def parse_packet(data): """Парсить один пакет данных лидара""" if len(data) < PACKET_SIZE or data[0] != HEADER: return None index = data[1] - 0xA0 if index < 0 or index > 89: return None points = [] for i in range(4): offset = 4 + i * 4 b0 = data[offset] b1 = data[offset + 1] b2 = data[offset + 2] b3 = data[offset + 3] invalid = (b1 >> 7) & 1 distance = b0 | ((b1 & 0x3F) << 8) quality = b2 | (b3 << 8) angle = index * 4 + i if not invalid and distance > 0: points.append((angle, distance, quality)) return points def save_to_db(): """Основной цикл записи данных в БД""" global conn, ser, running conn = init_db() try: ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0.1) print("Подключено к лидару") except Exception as e: print(f"Ошибка подключения: {e}") conn.close() return print("Запись данных лидара...") buffer = bytearray() start_time = time.time() try: while running: data = ser.read(PACKET_SIZE) if not data: continue buffer.extend(data) while len(buffer) >= PACKET_SIZE: if buffer[0] == HEADER: packet = bytes(buffer[:PACKET_SIZE]) points = parse_packet(packet) if points: t = round(time.time(), 3) for angle, distance, quality in points: conn.execute( 'INSERT INTO lidar_data VALUES (?, ?, ?, ?)', (t, angle, round(distance, 1), quality) ) conn.commit() buffer = buffer[PACKET_SIZE:] else: buffer.pop(0) finally: ser.close() conn.close() print("Лидар модуль остановлен") save_to_db()