This repository has been archived on 2020-04-08. You can view files and clone it, but cannot push or open issues or pull requests.
Files
Fusion2016/code/lukas/TurnDetector.py

446 lines
11 KiB
Python
Executable File

import numpy as np
import sys
import scipy.integrate
import math
import argparse
from sklearn.decomposition import PCA
import scipy.signal as signal
def project(v1, v2):
"""
Project vector v1 on v2
Return projected vector
"""
p = [np.dot(a, g) / np.dot(g,g) for a,g in zip(v1, v2)]
p = np.array(p)
p = [p*g for p,g in zip(p, v2)]
p = np.array(p)
return p
def motion_axis(time, lin_accel, gravity, interval = 500):
"""
Returns the motion axis, which is the axis with the biggest variance
lin_accel -- Linear acceleration
gravity -- Gravity
Lin_accel and gravity should have equal length
"""
p = project(lin_accel, gravity)
#add time to vector p
p = np.array([time, p[:,0], p[:,1], p[:,2]]).T
start = 0
end = start + interval
end_time = p[:,0][-1] #last timestep
pca = PCA(n_components=1)
result = []
while start < end_time:
indices = np.where((p[:,0] >= start) & (p[:,0] < end))
Z = p[indices, 1:3][0]
Z[:,0] = signal.medfilt(Z[:,0],31)
Z[:,1] = signal.medfilt(Z[:,1],31)
pca.fit(Z)
x1 = pca.components_[0][0]
y1 = pca.components_[0][1]
result.append((end, x1, y1))
start += interval
end += interval
return np.array(result)
def angle_between(v1, v2):
l_a = np.linalg.norm(v1)
l_b = np.linalg.norm(v2)
cos_ab = np.dot(v1, v2 / (l_a * l_b))
angle = np.arccos(cos_ab) * 180/math.pi
return min([180 - angle, angle])
def rotate_data_fhws(data, data_t, rotation, rotation_t):
#Invert rotationmatrix
np.linalg.inv(rotation)
#Align rotation time according to data time
tmp = []
for t in data_t:
# Find indices of roation matrix that are earlier
#than the current time of the sensor value
ind = np.where(rotation_t <= t)[0]
#Use the last index
if len(ind) != 0:
tmp.append(ind[-1])
else:
tmp.append(0)
#Only use the values of the rotation matrix that are aligned with the sensor data
rotation = rotation[tmp]
# Multiply data with rotation matrix
rot_data = []
for i, row in enumerate(data):
row = np.append(row, 1)
rot_data.append(np.dot(rotation[i], row))
return np.array(rot_data)
def rotate_data_lukas(data, rotation):
#Invert rotationmatrix
np.linalg.inv(rotation)
rot_data = []
for i, row in enumerate(data):
row = np.append(row, 1)
rot_data.append(np.dot(rotation[i], row))
return np.array(rot_data)
def read_data(fname):
"""
Read the data out of the file provided by FHWS sensor reader app
"""
time = np.loadtxt(fname,
delimiter=";",
usecols=[0],
unpack=True)
f = open(fname, 'r')
lin_accel = []
gyros = []
rotations = []
gravity = []
start = time[0]
time = []
gyro_tmp = [0, 0, 0]
lin_accel_tmp = [0, 0, 0]
gravity_tmp = [0, 0, 9.81]
rotations_tmp = 16*[-1]
s = 0
for line in f:
line = line.split(";")
t = int(line[0]) - start
#Gyro-Data
if line[1] == "3":
gyro_tmp[0] = line[2]
gyro_tmp[1] = line[3]
gyro_tmp[2] = line[4]
#Linear Acceleration-Data
elif line[1] == "2":
lin_accel_tmp[0] = line[2]
lin_accel_tmp[1] = line[3]
lin_accel_tmp[2] = line[4]
#Gravity data
elif line[1] == "1":
gravity_tmp[0] = line[2]
gravity_tmp[1] = line[3]
gravity_tmp[2] = line[4]
#Rotation-Data
elif line[1] == "7":
for i in range(0,16):
rotations_tmp[i] = line[i+2]
if s != t:
gyros.append(gyro_tmp[:])
lin_accel.append(lin_accel_tmp[:])
gravity.append(gravity_tmp[:])
rotations.append(rotations_tmp[:])
time.append(t)
s = t
gyros = np.array(gyros, dtype=float)
lin_accel = np.array(lin_accel, dtype=float)
gravity = np.array(gravity, dtype=float)
rotations = np.array(rotations, dtype=float)
time = np.array(time, dtype = int)
#HACK
#In the first timestamps the rotation matrix is all zero, because
#no measurements are available yet.
#Avoid this by replacing these lines with the first measured
#rotation matrix
ind = np.where(rotations[:,0] == -1)[0]
if len(ind) != 0:
index = ind[-1] + 1
rotations[ind] = rotations[index]
#Reshape matrix
rotations = [row.reshape((4,4)) for row in rotations]
rotations = np.array(rotations)
return time, gyros, lin_accel, gravity, rotations
def detect_turns(time, signal, interval):
n_intervals = int(time[-1] / interval) + 1
result = []
for i in range(n_intervals):
start = i * interval
end = start + interval
tmp = integrate(start, end, zip(time, signal)) * 180.0/math.pi
result.append((end, tmp))
return np.array(result)
def integrate(time_from, time_to, signal):
"""Integrate signal from time_from to time_to. Signal should be two dimensional.
First dimension is the timestamp, second dimension is the signal value.
dt is the interval between two recordings
"""
sum = 0
last_time = 0
#go through signal
for value in signal:
#check if time is in the given timespan
if time_from <= value[0] < time_to:
#multiply value with dt and add it to the sum = integral
# sum += value[1] * dt
sum += value[1] * ((value[0] - last_time)/1000.)
last_time = value[0]
#sum is the integral over rad/s
return sum
def write_to_file(fname, turns, motion):
f = open(fname, 'w')
for index, t in enumerate(turns):
f.write(str(t[0]) + "," + str(t[1]) + "," + str(motion[index][1]) + "\n")
f.close()
def deg_to_rad(deg):
return deg * math.pi / 180.0
def rad_to_deg(rad):
return rad * 180.0 / math.pi
def main():
parser = argparse.ArgumentParser()
parser.add_argument("fname_sensor",
help = "Gyroscope file")
parser.add_argument("fname_output",
help = "Output file, where timestamps and angle of heading will be saved")
parser.add_argument("--time",
help = "Time interval, over which gyroscope will be integrated (default=500ms)",
type=int)
parser.add_argument("--rad",
help = "Output angles in rad (default in degree)",
action = "store_true")
parser.add_argument("--file_format",
help = "Sensor data file format [fhws|lukas] (default: fhws)",
type = str)
parser.add_argument("--cosy",
help = "Coordinate system of the gyroscope data [world|device] (default: device). If given in device, the data will automatically be rotated in world coordinates.",
type = str)
args = parser.parse_args()
#Choose between file format of sensor data and coordinate system
file_format = "fhws"
cosy = "device"
if args.file_format:
file_format = args.file_format
if args.cosy:
cosy = args.cosy
#My own data format
if file_format == "lukas":
delimiter = ","
time_cols = [40]
time = np.loadtxt(args.fname_sensor,
delimiter=delimiter,
usecols=time_cols,
skiprows = 1,
unpack=True)
if cosy == "device":
gyros_cols = [9, 10, 11]
lin_accel_cols = [6, 7, 8]
else:
gyros_cols = [34, 35,36]
lin_accel_cols = [37, 38, 39]
grav_cols = [3, 4, 5]
gyroX, gyroY, gyroZ = np.loadtxt(args.fname_sensor,
delimiter=delimiter,
usecols=gyros_cols,
skiprows = 1,
unpack=True)
rotation = np.loadtxt(args.fname_sensor,
delimiter = delimiter,
usecols=range(18,34),
skiprows=1,
unpack=True)
lin_accel_X, lin_accel_Y, lin_accel_Z = np.loadtxt(args.fname_sensor,
delimiter=delimiter,
usecols=lin_accel_cols,
skiprows=1,
unpack=True)
gravity_X, gravity_Y, gravity_Z = np.loadtxt(args.fname_sensor,
delimiter=delimiter,
usecols=grav_cols,
skiprows=1,
unpack=True)
rotation = rotation.T
rotation = [row.reshape((4,4)) for row in rotation]
# rotation = np.array(rotation).T
print rotation
gyro = np.array([gyroX, gyroY, gyroZ]).T
lin_accel = np.array([lin_accel_X, lin_accel_Y, lin_accel_Z]).T
gravity = np.array([gravity_X, gravity_Y, gravity_Z]).T
if cosy == "device":
world_gyro = rotate_data_lukas(gyro, rotation)
world_lin_accel = rotate_data_lukas(lin_accel, rotation)
else:
world_gyro = gyro
world_lin_accel = lin_accel
#FHWS file format
else:
time, gyro, lin_accel, gravity, rotation = read_data(args.fname_sensor)
if cosy == "device":
world_gyro = rotate_data_lukas(gyro, rotation)
world_lin_accel = rotate_data_lukas(lin_accel, rotation)
else:
print "Option 'fhws' in combination with 'world' not available"
return
gyroX = world_gyro[:,0]
gyroY = world_gyro[:,1]
gyroZ = world_gyro[:,2]
lin_accel_X = world_lin_accel[:,0]
lin_accel_Y = world_lin_accel[:,1]
lin_accel_Z = world_lin_accel[:,2]
#Parameters
#---------
time_interval = 500
if args.time:
time_interval = args.time
turns = detect_turns(time, gyroZ, time_interval)
motion = motion_axis(time, lin_accel, gravity, 500)
angles = []
for index, axis in enumerate(motion):
if index == 0:
angle = 0
else:
x_1 = motion[index-1][1]
y_1 = motion[index-1][2]
x_2 = axis[1]
y_2 = axis[2]
a = np.array([x_1, y_1])
b = np.array([x_2, y_2])
angle = angle_between(a,b)
angles.append((axis[0], angle))
np.set_printoptions(suppress=True)
turns = np.array(turns)
angles = np.array(angles)
if args.rad:
turns[:,1] = deg_to_rad(turns[:,1])
print "Sum of all angles: ", np.sum(turns[:,1])
write_to_file(args.fname_output, turns, angles)
if __name__ == "__main__":
main()