import serial import time import sqlite3 import struct import signal # Константы SERIAL_PORT = '/dev/ttyS0' BAUD_RATE = 230400 DB_PATH = '../inertial_data.db' # Заголовок пакета LDS02RR HEADER = 0xFA PACKET_SIZE = 22 # Глобальные переменные для завершения running = True conn = None ser = None def shutdown(signum, frame): """Обработчик сигнала завершения""" global running print("Завершение лидар модуля...") running = False # Перехватываем сигналы завершения signal.signal(signal.SIGTERM, shutdown) signal.signal(signal.SIGINT, shutdown) def init_db(): """Создать таблицу если не существует и очистить""" conn = sqlite3.connect(DB_PATH) conn.execute('''CREATE TABLE IF NOT EXISTS lidar_data ( timestamp REAL, angle REAL, distance_mm REAL, quality INTEGER )''') conn.execute('DELETE FROM lidar_data') conn.commit() return conn def parse_packet(data): """Парсить один пакет данных лидара""" if len(data) < PACKET_SIZE or data[0] != HEADER: return None index = data[1] - 0xA0 if index < 0 or index > 89: return None points = [] for i in range(4): offset = 4 + i * 4 distance_raw = struct.unpack_from('> 2 distance = distance_raw / 4.0 angle = index * 4 + i points.append((angle, distance, quality)) return points def save_to_db(): """Основной цикл записи данных в БД""" global conn, ser, running conn = init_db() try: ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1) print("Подключено к лидару") except Exception as e: print(f"Ошибка подключения: {e}") conn.close() return print("Запись данных лидара...") buffer = bytearray() start_time = time.time() try: while running: data = ser.read(PACKET_SIZE) if not data: continue buffer.extend(data) while len(buffer) >= PACKET_SIZE: if buffer[0] == HEADER: packet = bytes(buffer[:PACKET_SIZE]) points = parse_packet(packet) if points: t = round(time.time() - start_time, 3) for angle, distance, quality in points: conn.execute( 'INSERT INTO lidar_data VALUES (?, ?, ?, ?)', (t, angle, round(distance, 1), quality) ) conn.commit() buffer = buffer[PACKET_SIZE:] else: buffer.pop(0) finally: ser.close() conn.close() print("Лидар модуль остановлен") save_to_db()