This repository has been archived on 2020-04-08. You can view files and clone it, but cannot push or open issues or pull requests.
Files
Fusion2016/code/lukas/StepDetector.py

279 lines
7.3 KiB
Python
Executable File

import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import argrelmax
import sys
import math
import argparse
def rotate_data_fhws(data, data_t, rotation, rotation_t):
#Invert rotationmatrix
np.linalg.inv(rotation)
#Align rotation time according to data time
tmp = []
for t in data_t:
# Find indices of roation matrix that are earlier
#than the current time of the sensor value
ind = np.where(rotation_t <= t)[0]
#Use the last index
if len(ind) != 0:
tmp.append(ind[-1])
else:
tmp.append(0)
#Only use the values of the rotation matrix that are aligned with the sensor data
rotation = rotation[tmp]
# Multiply data with rotationmatrix
rot_data = []
for i, row in enumerate(data):
row = np.append(row, 1)
rot_data.append(np.dot(rotation[i], row))
return np.array(rot_data)
def rotate_data_lukas(data, rotation):
#Invert rotationmatrix
np.linalg.inv(rotation)
rot_data = []
for i, row in enumerate(data):
row = np.append(row, 1)
rot_data.append(np.dot(rotation[i], row))
return np.array(rot_data)
def magnitude(x, y, z):
ret = [math.sqrt(i) for i in (x**2 + y**2 + z**2)]
mean = np.mean(ret)
ret -= mean
return ret
def count_steps(time, signal, lt, ht, dead):
"""
Find steps in the accelerometer signal.
After a step was found, a "dead" period exists, where no step can be found again.
This is to avoid too many steps
Parameters
----------
time: array_like
Timestaps of accelerometer signal
Must have same length as signal
signal: array_like
Accelerometer signal of all three axis.
Must have same length as time
lt: float
Low threshold, which must be exceeded by the accelerometer signal to be counted as step
ht: float
High treshold, which must not be exceeded by the accelerometer signal to be counted as step
dead: float
After a step was detected, during the dead time no other step will be found.
Given in milliseconds
"""
time_signal = zip(time, signal)
dead_time = 0
steps = []
for i in time_signal:
if lt < i[1] < ht and i[0] > dead_time:
steps.append(i[0])
dead_time = i[0] + dead
return np.array(steps)
def write_steps_to_file(fname, steps):
f = open(fname, 'w')
print steps
for s in steps:
f.write(str(s) + "\n")
f.close()
def plot_steps(time, signal, steps):
plt.title("Step detection")
plt.xlabel("ms")
plt.ylabel("Accelerometer magnitude")
plt.plot(time, signal, label="Accelerometer")
s = []
for i,t in enumerate(time):
if t in steps:
s.append((t, signal[i]))
s = np.array(s)
plt.plot(s[:,0], s[:,1], 'ro', label = "Steps")
plt.legend(numpoints=1)
plt.show()
def read_data(fname):
time = np.loadtxt(fname,
delimiter=";",
usecols=[0],
unpack=True)
f = open(fname, 'r')
accls = []
accls_t = []
rotations = []
rotations_t = []
start = time[0]
for line in f:
line = line.split(";")
t = int(line[0]) - start
#Lin Accel
if line[1] == "2":
accls_t.append(t)
accls.append((line[2], line[3], line[4]))
#Rotation
elif line[1] == "7":
rotations_t.append(t)
rotations.append((line[2], line[3], line[4], line[5],
line[6], line[7], line[8], line[9],
line[10], line[11], line[12],line[13],
line[14], line[15], line[16], line[17]))
accls = np.array(accls, dtype=float)
accls_t = np.array(accls_t, dtype=int)
rotations = np.array(rotations, dtype=float)
rotations = [row.reshape((4,4)) for row in rotations]
rotations = np.array(rotations)
rotations_t = np.array(rotations_t, dtype=int)
return accls, accls_t, rotations, rotations_t
def main():
parser = argparse.ArgumentParser()
parser.add_argument("fname_sensor",
help = "Accelerometer file")
parser.add_argument("fname_output",
help = "Output file, where timestamps of steps will be saved")
parser.add_argument("--dead",
help = "Time span (in ms) after a detected step in which no additional step will be detected (default=600)",
type=int)
parser.add_argument("--lt",
help = "Low threshold, which must be exceeded by the accelerometer signal to be counted as step (default=1.5)",
type=float)
parser.add_argument("--ht",
help = "High treshold, which must not be exceeded by the accelerometer signal to be counted as step(default=6.5)",
type=float)
parser.add_argument("--plot",
help = "Plot step detection",
action="store_true")
parser.add_argument("--file_format",
help = "Sensor data file format [fhws|lukas] (default: fhws)",
type = str)
args = parser.parse_args()
file_format = "fhws"
if args.file_format:
file_format = args.file_format
#My own data format
if file_format == "lukas":
delimiter = ','
time_cols = [40]
accel_cols = [6,7,8]
time = np.loadtxt(args.fname_sensor,
delimiter=delimiter,
usecols=time_cols,
skiprows=2,
unpack=True)
accelX, accelY, accelZ = np.loadtxt(args.fname_sensor,
delimiter=delimiter,
usecols=accel_cols,
skiprows=2,
unpack=True)
rotation = np.loadtxt(args.fname_sensor,
delimiter = delimiter,
usecols=range(18,34),
skiprows=1,
unpack=True)
rotations = rotation.T
rotations = [row.reshape((4,4)) for row in rotations]
accl = np.array([accelX, accelY, accelZ]).T
world_accl = rotate_data_lukas(accl, rotations)
#FHWS file format
else:
accls, time, rotation, rotation_t = read_data(args.fname_sensor)
world_accl = rotate_data_fhws(accls, time, rotation, rotation_t)
accelX = world_accl[:,0]
accelY = world_accl[:,1]
accelZ = world_accl[:,2]
accel_mag = magnitude(accelX, accelY, accelZ)
lt = 1.5
ht = 6.5
dead = 600
if args.dead:
dead = args.dead
if args.lt:
lt = args.lt
if args.ht:
ht = args.ht
steps = count_steps(time, accel_mag, lt, ht, dead)
print("#Steps detected: ", len(steps))
write_steps_to_file(args.fname_output, steps)
if args.plot:
plot_steps(time, accel_mag, steps)
if __name__ == "__main__":
main()