initial commit before ownership transfer
This commit is contained in:
278
code/lukas/StepDetector.py
Executable file
278
code/lukas/StepDetector.py
Executable file
@@ -0,0 +1,278 @@
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
from scipy.signal import argrelmax
|
||||
import sys
|
||||
import math
|
||||
import argparse
|
||||
|
||||
|
||||
def rotate_data_fhws(data, data_t, rotation, rotation_t):
|
||||
#Invert rotationmatrix
|
||||
np.linalg.inv(rotation)
|
||||
|
||||
#Align rotation time according to data time
|
||||
tmp = []
|
||||
for t in data_t:
|
||||
# Find indices of roation matrix that are earlier
|
||||
#than the current time of the sensor value
|
||||
ind = np.where(rotation_t <= t)[0]
|
||||
|
||||
#Use the last index
|
||||
if len(ind) != 0:
|
||||
tmp.append(ind[-1])
|
||||
else:
|
||||
tmp.append(0)
|
||||
|
||||
#Only use the values of the rotation matrix that are aligned with the sensor data
|
||||
rotation = rotation[tmp]
|
||||
|
||||
# Multiply data with rotationmatrix
|
||||
rot_data = []
|
||||
for i, row in enumerate(data):
|
||||
row = np.append(row, 1)
|
||||
rot_data.append(np.dot(rotation[i], row))
|
||||
|
||||
return np.array(rot_data)
|
||||
|
||||
|
||||
|
||||
|
||||
def rotate_data_lukas(data, rotation):
|
||||
#Invert rotationmatrix
|
||||
np.linalg.inv(rotation)
|
||||
|
||||
rot_data = []
|
||||
for i, row in enumerate(data):
|
||||
row = np.append(row, 1)
|
||||
rot_data.append(np.dot(rotation[i], row))
|
||||
|
||||
return np.array(rot_data)
|
||||
|
||||
def magnitude(x, y, z):
|
||||
ret = [math.sqrt(i) for i in (x**2 + y**2 + z**2)]
|
||||
mean = np.mean(ret)
|
||||
ret -= mean
|
||||
return ret
|
||||
|
||||
def count_steps(time, signal, lt, ht, dead):
|
||||
"""
|
||||
Find steps in the accelerometer signal.
|
||||
After a step was found, a "dead" period exists, where no step can be found again.
|
||||
This is to avoid too many steps
|
||||
|
||||
Parameters
|
||||
----------
|
||||
time: array_like
|
||||
Timestaps of accelerometer signal
|
||||
Must have same length as signal
|
||||
signal: array_like
|
||||
Accelerometer signal of all three axis.
|
||||
Must have same length as time
|
||||
lt: float
|
||||
Low threshold, which must be exceeded by the accelerometer signal to be counted as step
|
||||
ht: float
|
||||
High treshold, which must not be exceeded by the accelerometer signal to be counted as step
|
||||
dead: float
|
||||
After a step was detected, during the dead time no other step will be found.
|
||||
Given in milliseconds
|
||||
"""
|
||||
|
||||
time_signal = zip(time, signal)
|
||||
dead_time = 0
|
||||
|
||||
steps = []
|
||||
for i in time_signal:
|
||||
if lt < i[1] < ht and i[0] > dead_time:
|
||||
steps.append(i[0])
|
||||
dead_time = i[0] + dead
|
||||
|
||||
return np.array(steps)
|
||||
|
||||
|
||||
|
||||
|
||||
def write_steps_to_file(fname, steps):
|
||||
f = open(fname, 'w')
|
||||
|
||||
print steps
|
||||
for s in steps:
|
||||
f.write(str(s) + "\n")
|
||||
|
||||
f.close()
|
||||
|
||||
def plot_steps(time, signal, steps):
|
||||
plt.title("Step detection")
|
||||
plt.xlabel("ms")
|
||||
plt.ylabel("Accelerometer magnitude")
|
||||
plt.plot(time, signal, label="Accelerometer")
|
||||
|
||||
s = []
|
||||
for i,t in enumerate(time):
|
||||
if t in steps:
|
||||
s.append((t, signal[i]))
|
||||
|
||||
s = np.array(s)
|
||||
plt.plot(s[:,0], s[:,1], 'ro', label = "Steps")
|
||||
|
||||
|
||||
|
||||
plt.legend(numpoints=1)
|
||||
plt.show()
|
||||
|
||||
|
||||
|
||||
def read_data(fname):
|
||||
time = np.loadtxt(fname,
|
||||
delimiter=";",
|
||||
usecols=[0],
|
||||
unpack=True)
|
||||
|
||||
f = open(fname, 'r')
|
||||
|
||||
accls = []
|
||||
accls_t = []
|
||||
rotations = []
|
||||
rotations_t = []
|
||||
start = time[0]
|
||||
|
||||
for line in f:
|
||||
line = line.split(";")
|
||||
t = int(line[0]) - start
|
||||
|
||||
#Lin Accel
|
||||
if line[1] == "2":
|
||||
accls_t.append(t)
|
||||
accls.append((line[2], line[3], line[4]))
|
||||
|
||||
#Rotation
|
||||
elif line[1] == "7":
|
||||
rotations_t.append(t)
|
||||
rotations.append((line[2], line[3], line[4], line[5],
|
||||
line[6], line[7], line[8], line[9],
|
||||
line[10], line[11], line[12],line[13],
|
||||
line[14], line[15], line[16], line[17]))
|
||||
|
||||
|
||||
accls = np.array(accls, dtype=float)
|
||||
accls_t = np.array(accls_t, dtype=int)
|
||||
|
||||
rotations = np.array(rotations, dtype=float)
|
||||
rotations = [row.reshape((4,4)) for row in rotations]
|
||||
rotations = np.array(rotations)
|
||||
rotations_t = np.array(rotations_t, dtype=int)
|
||||
|
||||
return accls, accls_t, rotations, rotations_t
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("fname_sensor",
|
||||
help = "Accelerometer file")
|
||||
parser.add_argument("fname_output",
|
||||
help = "Output file, where timestamps of steps will be saved")
|
||||
parser.add_argument("--dead",
|
||||
help = "Time span (in ms) after a detected step in which no additional step will be detected (default=600)",
|
||||
type=int)
|
||||
parser.add_argument("--lt",
|
||||
help = "Low threshold, which must be exceeded by the accelerometer signal to be counted as step (default=1.5)",
|
||||
type=float)
|
||||
parser.add_argument("--ht",
|
||||
help = "High treshold, which must not be exceeded by the accelerometer signal to be counted as step(default=6.5)",
|
||||
type=float)
|
||||
parser.add_argument("--plot",
|
||||
help = "Plot step detection",
|
||||
action="store_true")
|
||||
parser.add_argument("--file_format",
|
||||
help = "Sensor data file format [fhws|lukas] (default: fhws)",
|
||||
type = str)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
file_format = "fhws"
|
||||
|
||||
if args.file_format:
|
||||
file_format = args.file_format
|
||||
|
||||
#My own data format
|
||||
if file_format == "lukas":
|
||||
|
||||
delimiter = ','
|
||||
time_cols = [40]
|
||||
accel_cols = [6,7,8]
|
||||
|
||||
time = np.loadtxt(args.fname_sensor,
|
||||
delimiter=delimiter,
|
||||
usecols=time_cols,
|
||||
skiprows=2,
|
||||
unpack=True)
|
||||
|
||||
accelX, accelY, accelZ = np.loadtxt(args.fname_sensor,
|
||||
delimiter=delimiter,
|
||||
usecols=accel_cols,
|
||||
skiprows=2,
|
||||
unpack=True)
|
||||
|
||||
rotation = np.loadtxt(args.fname_sensor,
|
||||
delimiter = delimiter,
|
||||
usecols=range(18,34),
|
||||
skiprows=1,
|
||||
unpack=True)
|
||||
|
||||
rotations = rotation.T
|
||||
rotations = [row.reshape((4,4)) for row in rotations]
|
||||
accl = np.array([accelX, accelY, accelZ]).T
|
||||
world_accl = rotate_data_lukas(accl, rotations)
|
||||
|
||||
#FHWS file format
|
||||
else:
|
||||
accls, time, rotation, rotation_t = read_data(args.fname_sensor)
|
||||
world_accl = rotate_data_fhws(accls, time, rotation, rotation_t)
|
||||
|
||||
accelX = world_accl[:,0]
|
||||
accelY = world_accl[:,1]
|
||||
accelZ = world_accl[:,2]
|
||||
|
||||
accel_mag = magnitude(accelX, accelY, accelZ)
|
||||
|
||||
lt = 1.5
|
||||
ht = 6.5
|
||||
dead = 600
|
||||
|
||||
if args.dead:
|
||||
dead = args.dead
|
||||
if args.lt:
|
||||
lt = args.lt
|
||||
if args.ht:
|
||||
ht = args.ht
|
||||
|
||||
steps = count_steps(time, accel_mag, lt, ht, dead)
|
||||
print("#Steps detected: ", len(steps))
|
||||
write_steps_to_file(args.fname_output, steps)
|
||||
|
||||
if args.plot:
|
||||
plot_steps(time, accel_mag, steps)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user