handle data receive errors

This commit is contained in:
Данила Горнушко 2024-05-27 17:45:21 +03:00
parent f4d82224a3
commit c9f1c0968e

View file

@ -8,6 +8,9 @@ use std::io::{BufRead, BufReader, Read, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use hound::{WavSpec, SampleFormat, WavWriter};
use std::fs::File;
use std::time::{SystemTime, UNIX_EPOCH};
const MAX_DATA_LEN: usize = 1024 * 2 + 4 + 2;
@ -51,7 +54,7 @@ fn main() {
port.set_timeout(calculate_timeout(args.baud_rate, 30))
.unwrap();
let command = format!("@{} name\n", args.device_id);
let response = process_command(&mut port, command).unwrap();
let response = process_command(&mut port, command).unwrap_or_default();
debug!("We got some response: {}", response);
if response.contains("'HAILSENS'") {
info!("Detected hailsens.")
@ -63,7 +66,7 @@ fn main() {
let winsize = get_winsize(&mut port, args.device_id).expect("Couldn't get winsize") as usize;
debug!("Expected buf size is: {}", exp_buf_size(winsize));
let spec = hound::WavSpec {
let spec = WavSpec {
channels: 1,
sample_rate: get_sample_rate(&mut port, args.device_id),
bits_per_sample: 16,
@ -89,12 +92,7 @@ fn main() {
let cur_chunk_id = match get_stats(&mut port, args.device_id) {
Err(e) => {
error!("Couldn't read adcstats: {}", e);
err_counter += 1;
if err_counter > 10 {
panic!("Too many error! Exiting...")
}
error!("Trying again in 500ms");
std::thread::sleep(Duration::from_millis(500));
handle_error_with_timeout(&mut err_counter);
continue;
}
Ok(value) => {
@ -104,14 +102,16 @@ fn main() {
};
if cur_chunk_id != last_chunk_id {
info!("New data available: {}", cur_chunk_id);
last_chunk_id = cur_chunk_id;
info!("Getting new chunk!");
match get_chunk(&mut port, args.device_id, args.baud_rate, winsize) {
Some(data) => {
info!("Received new data with len: {}", data.len());
err_counter = err_counter.saturating_sub(1);
last_chunk_id = cur_chunk_id;
}
_ => {
error!("Couldn't receive new data!");
handle_error_with_timeout(&mut err_counter);
}
}
}
@ -119,9 +119,18 @@ fn main() {
}
}
fn handle_error_with_timeout(err_counter: &mut u32) {
*err_counter += 5;
if *err_counter > 30 {
panic!("Too many error! Exiting...")
}
error!("Trying again in 1 second...");
std::thread::sleep(Duration::from_millis(1_000));
}
fn get_stats(port: &mut Box<dyn SerialPort>, id: u32) -> Result<u32> {
let command = format!("@{} adcstats\n", id);
let response = process_command(port, command).unwrap();
let response = process_command(port, command).unwrap_or_default();
let mut split = response.split_whitespace();
let num_since_startup: u32 = split.next().context("Missing value")?.parse()?;
let _num_since_last_read: u32 = split.next().context("Missing value")?.parse()?;
@ -133,7 +142,7 @@ fn get_stats(port: &mut Box<dyn SerialPort>, id: u32) -> Result<u32> {
fn get_winsize(port: &mut Box<dyn SerialPort>, id: u32) -> Option<u32> {
debug!("Getting window size...");
let command = format!("@{} winsize\n", id);
let response = process_command(port, command).unwrap();
let response = process_command(port, command).unwrap_or_default();
let winsize_str = response.split_whitespace().nth(3);
match winsize_str {
Some(num_str) => {
@ -150,7 +159,7 @@ fn get_winsize(port: &mut Box<dyn SerialPort>, id: u32) -> Option<u32> {
fn get_threshold(port: &mut Box<dyn SerialPort>, id: u32) -> Option<u32> {
let command = format!("@{} adcthresh\n", id);
let response = process_command(port, command).unwrap();
let response = process_command(port, command).unwrap_or_default();
let number_str = response.split_whitespace().nth(3);
match number_str {
Some(num_str) => {
@ -167,7 +176,7 @@ fn get_threshold(port: &mut Box<dyn SerialPort>, id: u32) -> Option<u32> {
fn get_sample_rate(port: &mut Box<dyn SerialPort>, id: u32) -> u32 {
let command = format!("@{} srate\n", id);
let response = process_command(port, command).unwrap();
let response = process_command(port, command).unwrap_or_default();
let mut split = response.split_whitespace();
let adc_div: u32 = split
.next()
@ -223,7 +232,7 @@ fn get_sample_rate(port: &mut Box<dyn SerialPort>, id: u32) -> u32 {
fallback_freq
} else {
let adc_div_val = adc_div_dict[&adc_div];
let adc_sample_time_val = sample_times[&adc_sample_time].unwrap();
let adc_sample_time_val = sample_times[&adc_sample_time].unwrap_or_default();
let res = ((16_000_000.0 / adc_div_val as f64) / (adc_sample_time_val + 12.5)) as u32;
info!(
"Using calculated ADC sample rate: {}",
@ -346,3 +355,10 @@ fn header_is_ok(vec: &[u8]) -> bool {
fn exp_buf_size(win_size: usize) -> usize {
win_size * 2 + 6
}
fn current_timestamp_ms() -> u128 {
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
since_the_epoch.as_millis()
}