-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathradarmonitor.py
More file actions
102 lines (83 loc) · 3.49 KB
/
radarmonitor.py
File metadata and controls
102 lines (83 loc) · 3.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# -----------------------------------------------------------------------------
# Project: Alien Motion Tracker
# Copyright (c) 2025 RobSmithDev
#
# License: Non-Commercial Copyleft with Attribution (NCCL)
# Videos: https://www.youtube.com/playlist?list=PL18CvD80w43YAV8UG24NtwRc2Wy-i7yyd
# Build Guide: https://alien.robsmithdev.co.uk
#
# Summary:
# - Free for personal, academic, and research use.
# - Derivative works must use the same license, publish their source, and credit
# the original author.
# - Commercial use is NOT permitted without a separate license.
#
# Full license terms: see the LICENSE file or LICENSE_SUMMARY.md in this repo.
# -----------------------------------------------------------------------------
import threading,os
from multiprocessing import Process, Queue
from radar.radar_dev import RadarDev
from radar.signal_proc import SigProc
from radar.helper import find_setting_in_directory
import time
#import numpy as np
from multiprocessing import Process, SimpleQueue
from queue import Empty
from threadpoolctl import threadpool_limits
# this runs in a seperate process
def radarCalc(queueFromProcess: Queue, queueToProcess: Queue):
# Initialize device using the config
config = find_setting_in_directory("radar/config")
radar = RadarDev(config)
# Initialize signal processing
sig_proc = SigProc(radar.cfg)
threshold = 0.5
giro = [0, 0]
while True:
# Consume everything coming in
try:
threshold, gx, gy = queueFromProcess.get(timeout = 0.01)
giro = [max(abs(gx),giro[0]),max(abs(gy),giro[1])]
while True:
threshold, gx, gy = queueFromProcess.get(block=False)
giro = [max(abs(gx),giro[0]),max(abs(gy),giro[1])]
except Empty:
pass
# Now run the signal processing
frame = radar.get_next_frame()
if frame is None:
time.sleep(0.05)
else:
locations = sig_proc.update_with_sensitivity(frame, s=threshold, vx_mps=giro[0], vy_mps=giro[1])
queueToProcess.put(locations)
giro[0] = giro[0] / 2
giro[1] = giro[1] / 2
class RadarMonitor:
def __init__(self):
self.threshold = 0.5
self._lock = threading.Lock()
self.giro = [0, 0]
self.processorQueueToProcess = Queue(maxsize=40)
self.processorQueueFromProcess = Queue(maxsize=4)
self.processor = Process(target=radarCalc, args=(self.processorQueueToProcess,self.processorQueueFromProcess,), daemon=True)
self.processor.start()
def receive_gyro(self, x, y, z, yaw):
self.giro = [abs(x),abs(y)]
#print(float((np.hypot(self.giro[0], self.giro[1]))))
# Return all detections since last call
def getDetections(self):
ret = []
try:
while True:
ret.extend(self.processorQueueFromProcess.get(block=False))
except Empty:
pass
return ret
def setSensitivity(self, sense):
# If you put the slider in the other way around, remove the 1.0-
self.threshold = 0.5 if 0.48 < sense < 0.52 else sense
def pushUpdate(self):
try:
self.processorQueueToProcess.put([self.threshold, self.giro[0], self.giro[1]], block=False)
except Exception:
pass