"""Преобразование данных""" import struct import numpy as np from project import DATA_RATE import model NORMAL_LENGTH = 32 # размер пакета Start_byte = b'\x55' # стартовый байт, нужен для парсинга сообщения Stop_byte = b'\x77' # стоповый байт, нужен для парсинга сообщения threshold_MIN = 100 # минимално возможная амплитуда выше которой ищем пики в базовом режиме threshold_MAX = 400 # максимально возможная амплитуда выше которой не сохраняем # методы чтения передаваемых данных def shift_or(one, two, three, four): """Чтение 4-ёх байтного параметра""" a = one << 24 | two << 16 | three << 8 | four return a def shift_or_24(two, three, four): """Чтение 3-ёх байтного параметра""" a = two << 16 | three << 8 | four return a def shift_or_16(three, four): """Чтение 2-ух байтного параметра""" a = three << 8 | four return a def get_data_bin(file, data_in): """ Получение данных из бинарного файла - записанных данных :param file: файл для чтения :param data_in: объект для хранения модели данных """ chunk = list(model.get_data_bin(file, data_in)) data_in.chunk = [item.to_bytes() for item in list(chunk)] def write_file(): """ Открытие на запись файла """ return model.write_file() def write_in_file(file, rx): """ Запись данных в файл :param file: файл :param rx: серия данных """ model.write_in_file(file, rx) def close_file(file): """ Закрыть файл :param file: файл """ return model.close_file(file) def create_mess(send_mode, param_code, param, size, type_param): """ Создание сообщения для передачи параметров :param send_mode: режим передачи данных 433 - 0(Bluetooth - 1) :param param_code: код для идентификации параметра для передачи(задано протоколом взаимодействия) :param param: передаваемое значение :param size: размер передаваемого значения :param type_param: тип передаваемого значения """ index = 2 if send_mode: index = 10 init_mes = [0x41, 0x54, 0x2b, 0x44, 0x41, 0x54, 0x41, 0x31, 0x55, param_code, 0x77, 0x0D, 0x0A] else: init_mes = [0x55, param_code, 0x77] param_in_bytes = [] if type_param == float: param = struct.unpack('i', struct.pack('f', param))[0] param_in_bytes = [int(((param & 0xFF000000)) >> 24), int(((param & 0xFF0000)) >> 16), int(((param & 0xFF00) >> 8)), int((param & 0xFF))] if type_param == int: if size == 4: param_in_bytes = [int(((param & 0xFF000000)) >> 24), int(((param & 0xFF0000)) >> 16), int(((param & 0xFF00) >> 8)), int((param & 0xFF))] if size == 3: param_in_bytes = [0, int(((param & 0xFF0000)) >> 16), int(((param & 0xFF00) >> 8)), int((param & 0xFF))] if size == 2: param_in_bytes = [0, 0, int(((param & 0xFF00) >> 8)), int((param & 0xFF))] if size == 1: param_in_bytes = [0, 0, 0, int((param & 0xFF))] for _ in param_in_bytes[::-1]: init_mes.insert(index, _) mes = bytes(init_mes) return mes def state_packet(send_mode, serial, param_code, param, size, type_param): """ Отправка пакета параметров :param send_mode: режим передачи данных 433 - 0(Bluetooth - 1) :param serial: канал передачи :param param_code: код для идентификации параметра для передачи(задано протоколом взаимодействия) :param param: передаваемое значение :param size: размер передаваемого значения :param type_param: тип передаваемого значения """ if serial.isOpen(): serial.writeData(create_mess(send_mode, param_code, param, size, type_param)) class DataClass: """ Класс модель данных для отображения на графике """ def __init__(self): # переменные для работы с потоком данных с устройства self.data_rx = [] # список в котором храним данные self.draw_cnt = np.uint16(0) # x-координата точки в текущий момент self.data_length = 0 # кол-во отсчетов для отрисовки графика по x-координате self.pac_count = 0 # счетчик для распознавания пакетов self.counter = [] # счетчик небитых пакетов # переменные для работы с записанным файлом self.i = 0 # счетчик для чтения файла self.chunk = [] # считанный массив из файла # переменные для отрисовки графиков # Сигнал по которому определяют ЧСС ЭКГ сигнал если у нас кардиограф # Правый желудочек (используется основной канал) self.ECG_RA = [] # np.zeros(self.data_length, np.int16) # позитивно отраженный сигнал self.ECG_RA_pos_sig = [] # np.zeros(self.data_length, np.int16) # динамический порог рассчитываемый в зависимости от нижнего и верхнего порогов self.ECG_RA_din_threshold = [] # np.zeros(self.data_length, np.int16) # нижний порог порог для метода треугольников при его пересечении считаем ЧСС событие self.ECG_RA_min_threshold = np.int16(threshold_MIN) # верхний порог для метода треугольников выше него мы не берём данные self.ECG_RA_max_threshold = np.int16(threshold_MAX) self.Last_RR_poz_rel = [] # время до прошлого события self.text_list = {} # список label на графике для динамического обновления self.last_period = [] # мгновенный период для отрисовки label self.rr_now = [] # есть ли удар сердца в текущий момент self.last_QRS = [] # тип последнего события Vsense = 0 нормальное сокращение # Vpace = 1 пришлось стимулировать сейчас не используем # Vnoise = 2 шумы в QRS комплексе(пока не реализовали) # переменные для отрисовки label (обозначение на графике событий) self.V_print = 0 # необходимость отображения события self.draw_cnt_last = 0 # x-координата события self.draw_last_period = 0 # мгновенный период для отображения self.draw_last_QRS = 0 # тип последнего события для отображения # переменные для передачи параметров стимулятору и отображения текущих значений self.dc_cut = 1 # вывод сигнала с постоянной составляющей или без self.channel = 3 # активный канал передачи self.signal = 0 # вывод сигнала с фильтром или без self.sd_card = 0 # запись на sd-карту self.hv_volt = 0 # напряжение на конденсаторе self.bat_volt = 0 # напряжение на батарее self.bat_pers = 0 # напряжение на батарее в процентах self.spi_pot_set = 0 # коэффициент усиления self.Work_Mode_pacemaker = 0 # текущий режим работы стимулятора self.redet_num = 0 # размер буфера редетекции self.redet_bad = 0 # порог буфера редетекции self.standby_timer = 0 # таймаут отключения self.hv_blind_time = 0 # время слепоты после разряда self.max_energy = 0 # максимальная энергия self.min_energy = 0 # минимальная энергия self.hv_step_number = 0 # кол-во ступеней высоковольтной терапии self.fibr_max_tres = 0 # порог корзины фибриляций self.tachy_2_tres = 0 # пороговое значение для тахикардии 2ст self.tachy_1_tres = 0 # пороговое значение для тахикардии 1ст self.fibr_tres = 0 # пороговое значение для фибрилляции self.max_search_time = 135 # длительность поиска максимума self.square_coef = 0.75 # порог чувствительности Т волны self.square_time = 350 # длительность защиты от T волны self.max_treshold = 8.0 # максимальная чувствительность self.min_treshold = 2.0 # минимальная чувствительность self.max_time = 1250 # максимальный размер RR интервала self.fibr_cnt = 0 # кол-во событий фибрилляции (прогресс-бар) self.norm_cnt = 0 # кол-во нормальных сокращений (прогресс-бар) self.tachy_1_cnt = 0 # кол-во событий тахикардии 1ст (прогресс-бар) self.tachy_2_cnt = 0 # кол-во событий тахикардии 2ст (прогресс-бар) self.hv_step_cnt = 0 # текущая ступень ВВ терапии self.last_period_stat = 0 # мгновенный период self.filt_period = 0 # усредненный период self.now_energy = 0 # текущая энергия self.Vn_cnt = 0 # счетчик Vn self.Vp_cnt = 0 # счетчик Vp self.Vs_cnt = 0 # счетчик Vs self.sub_mode = 0 # текущий подрежим терапии self.terapy_now = 0 # текущий режим терапии def clear_data(self): """ Очистить модель хранения данных """ self.__init__() # self.data_rx.clear() # self.data_length = 0 # self.counter = np.zeros(self.data_length, np.int16) # np.zeros(self.data_length, np.int16) # self.pac_count = 0 # self.draw_cnt = 0 # # self.i = 0 # self.chunk = [] # self.update_data(self.data_length) # # Сигнал по которому определяют ЧСС ЭКГ сигнал если у нас кардиограф и производная фильрованного ФПГ если часы # # Правый желудочек (используется основной канал) # self.ECG_RA = [] # # # позитивно отраженный сигнал # self.ECG_RA_pos_sig = [] # np.zeros(self.data_length, np.int16) # # # динамический порог рассчитываемый в зависимости от нижнего и верхнего порогов # self.ECG_RA_din_threshold = [] # np.zeros(self.data_length, np.int16) # # # нижний порог порог для метода треугольников при его пересечении считаем ЧСС событие # self.ECG_RA_min_threshold = np.int16(threshold_MIN) # # # верхний порог для метода треугольников выше него мы не берём данные # self.ECG_RA_max_threshold = np.int16(threshold_MAX) # счетчик пакетов для вывод на график(визуальный контроль) def update_data(self, data_length): """ Создание нулевых массивов для хранения в них даных для отображения :param data_length: длина массива (сколько значений поместится в окно) """ self.ECG_RA = np.zeros(data_length, np.float32) self.ECG_RA_pos_sig = np.zeros(data_length, np.float32) self.ECG_RA_din_threshold = np.zeros(data_length, np.float32) self.counter = np.zeros(data_length, np.float32) self.ECG_RA_min_threshold = np.zeros(data_length, np.float32) self.ECG_RA_max_threshold = np.zeros(data_length, np.float32) self.Last_RR_poz_rel = np.zeros(data_length, np.uint16) self.last_QRS = np.zeros(data_length, np.uint16) self.rr_now = np.zeros(data_length, np.uint16) self.last_period = np.zeros(data_length, np.uint16) def parse_list(self, data_in): """ Расшифровка полученной серии данных :param data_in: ссылка на объект модели данных """ self.data_rx = self.data_rx + list(data_in) if len(self.data_rx) >= NORMAL_LENGTH * 2: # парсить имеет смысл при длинне пакета больше минимальной while self.pac_count <= len(self.data_rx) - NORMAL_LENGTH: if (self.data_rx[self.pac_count] == Start_byte) and ( self.data_rx[self.pac_count + NORMAL_LENGTH - 1] == Stop_byte) \ and ((len(self.data_rx) - self.pac_count) >= NORMAL_LENGTH + 0): self.draw_cnt += 1 if self.draw_cnt >= self.data_length: self.draw_cnt = 0 self.text_list = {} self.parse_part_from_1_to_20_byte() if self.draw_cnt > 1: self.ECG_RA_min_threshold[self.draw_cnt] = self.ECG_RA_min_threshold[self.draw_cnt - 1] self.ECG_RA_max_threshold[self.draw_cnt] = self.ECG_RA_max_threshold[self.draw_cnt - 1] self.last_period[self.draw_cnt] = self.last_period[self.draw_cnt - 1] else: self.ECG_RA_min_threshold[self.draw_cnt] = self.ECG_RA_min_threshold[self.data_length - 1] self.ECG_RA_max_threshold[self.draw_cnt] = self.ECG_RA_max_threshold[self.data_length - 1] self.last_period[self.draw_cnt] = self.last_period[self.data_length - 1] self.parse_part_from_23_to_30_byte() if self.rr_now[self.draw_cnt] == 1: self.V_print = 1 self.draw_cnt_last = self.draw_cnt self.draw_last_period = self.last_period[self.draw_cnt] # в конце парсинга шагаем по массиву self.pac_count += NORMAL_LENGTH - 1 del self.data_rx[0:self.pac_count] self.pac_count = 0 self.pac_count += 1 def parse_part_from_1_to_20_byte(self): """ Парсинг с 1 по 20 байт """ self.counter[self.draw_cnt] = shift_or(int.from_bytes(self.data_rx[self.pac_count + 1], "big"), int.from_bytes(self.data_rx[self.pac_count + 2], "big"), int.from_bytes(self.data_rx[self.pac_count + 3], "big"), int.from_bytes(self.data_rx[self.pac_count + 4], "big")) int1 = (shift_or(int.from_bytes(self.data_rx[self.pac_count + 5], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 6], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 7], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 8], "big", signed=False))) self.ECG_RA_din_threshold[self.draw_cnt] = struct.unpack('f', int1.to_bytes(4, 'big'))[0] int1 = (shift_or(int.from_bytes(self.data_rx[self.pac_count + 9], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 10], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 11], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 12], "big", signed=False))) self.ECG_RA_pos_sig[self.draw_cnt] = struct.unpack('f', int1.to_bytes(4, 'big'))[0] int1 = (shift_or(int.from_bytes(self.data_rx[self.pac_count + 13], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 14], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 15], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 16], "big", signed=False))) self.ECG_RA[self.draw_cnt] = struct.unpack('f', int1.to_bytes(4, 'big'))[0] self.Last_RR_poz_rel[self.draw_cnt] = shift_or_16( int.from_bytes(self.data_rx[self.pac_count + 17], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 18], "big")) int1 = int.from_bytes(self.data_rx[self.pac_count + 19], "big") self.terapy_now = int1 & 0x3 self.sub_mode = (int1 & 0x1C) >> 2 int1 = int.from_bytes(self.data_rx[self.pac_count + 20], "big") self.last_QRS[self.draw_cnt] = int1 & 0x3 self.draw_last_QRS = int1 & 0x3 self.rr_now[self.draw_cnt] = (int1 & 0x4) >> 2 self.Work_Mode_pacemaker = (int1 & 0x60) >> 5 self.signal = (int1 & 0x80) >> 7 def parse_part_from_23_to_30_byte(self): """ Парсинг с 23 по 30 байт (передача значений в зависимости от остатка от деления счетчика) """ if self.counter[self.draw_cnt] % 10 == 0: int1 = (shift_or(int.from_bytes(self.data_rx[self.pac_count + 23], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 24], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 25], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 26], "big", signed=False))) self.ECG_RA_min_threshold[self.draw_cnt] = struct.unpack('f', int1.to_bytes(4, 'big'))[0] self.min_treshold = self.ECG_RA_min_threshold[self.draw_cnt] int1 = (shift_or(int.from_bytes(self.data_rx[self.pac_count + 27], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 28], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 29], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 30], "big", signed=False))) self.ECG_RA_max_threshold[self.draw_cnt] = struct.unpack('f', int1.to_bytes(4, 'big'))[0] self.max_treshold = self.ECG_RA_max_threshold[self.draw_cnt] if self.counter[self.draw_cnt] % 10 == 1: int1 = ( shift_or(int.from_bytes(self.data_rx[self.pac_count + 23], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 24], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 25], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 26], "big", signed=False))) self.square_coef = struct.unpack('f', int1.to_bytes(4, 'big'))[0] if self.counter[self.draw_cnt] % 10 == 2: self.square_time = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 29], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 30], "big", signed=False))) self.max_search_time = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 27], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 28], "big", signed=False))) if self.counter[self.draw_cnt] % 10 == 3: int1 = int.from_bytes(self.data_rx[self.pac_count + 24], "big") self.sd_card = (int1 & 0x4) >> 2 self.channel = int1 & 0x3 self.dc_cut = (int1 & 0x8) >> 3 self.spi_pot_set = int.from_bytes(self.data_rx[self.pac_count + 25], "big") self.bat_pers = int.from_bytes(self.data_rx[self.pac_count + 26], "big") self.bat_volt = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 27], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 28], "big", signed=False))) self.hv_volt = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 29], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 30], "big", signed=False))) if self.counter[self.draw_cnt] % 10 == 4: self.last_period[self.draw_cnt] = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 27], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 28], "big", signed=False))) self.last_period_stat = self.last_period[self.draw_cnt] self.max_time = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 25], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 26], "big", signed=False))) if self.counter[self.draw_cnt] % 10 == 5: self.Vs_cnt = ( shift_or_24(int.from_bytes(self.data_rx[self.pac_count + 23], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 24], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 25], "big", signed=False))) self.Vp_cnt = ( shift_or_24(int.from_bytes(self.data_rx[self.pac_count + 28], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 29], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 30], "big", signed=False))) self.Vn_cnt = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 26], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 27], "big", signed=False), )) if self.counter[self.draw_cnt] % 10 == 6: self.fibr_tres = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 23], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 24], "big", signed=False) )) self.tachy_2_tres = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 25], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 26], "big", signed=False) )) self.tachy_1_tres = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 27], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 28], "big", signed=False) )) if self.counter[self.draw_cnt] % 10 == 7: self.fibr_cnt = int.from_bytes(self.data_rx[self.pac_count + 25], "big") self.tachy_2_cnt = int.from_bytes(self.data_rx[self.pac_count + 26], "big") self.tachy_1_cnt = int.from_bytes(self.data_rx[self.pac_count + 27], "big") self.norm_cnt = int.from_bytes(self.data_rx[self.pac_count + 28], "big") self.fibr_max_tres = int.from_bytes(self.data_rx[self.pac_count + 29], "big") self.filt_period = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 23], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 24], "big", signed=False))) if self.counter[self.draw_cnt] % 10 == 8: self.now_energy = (shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 27], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 28], "big", signed=False))) / 10 self.min_energy = (int.from_bytes(self.data_rx[self.pac_count + 26], "big", signed=False)) // 10 self.max_energy = (shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 29], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 30], "big", signed=False))) // 10 int1 = int.from_bytes(self.data_rx[self.pac_count + 23], "big") self.hv_step_cnt = int1 & 0xF self.hv_step_number = (int1 & 0xF0) >> 4 if self.counter[self.draw_cnt] % 10 == 9: self.hv_blind_time = ( shift_or_16(int.from_bytes(self.data_rx[self.pac_count + 23], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 24], "big", signed=False))) self.standby_timer = ( shift_or(int.from_bytes(self.data_rx[self.pac_count + 27], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 28], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 29], "big", signed=False), int.from_bytes(self.data_rx[self.pac_count + 30], "big", signed=False) )) // 1000 int1 = int.from_bytes(self.data_rx[self.pac_count + 25], "big", signed=False) self.redet_bad = int1 & 0xF self.redet_num = (int1 & 0xF0) >> 4