446 lines
11 KiB
Python
Executable File
446 lines
11 KiB
Python
Executable File
import numpy as np
|
|
import sys
|
|
import scipy.integrate
|
|
import math
|
|
import argparse
|
|
from sklearn.decomposition import PCA
|
|
import scipy.signal as signal
|
|
|
|
|
|
def project(v1, v2):
|
|
"""
|
|
Project vector v1 on v2
|
|
Return projected vector
|
|
"""
|
|
|
|
p = [np.dot(a, g) / np.dot(g,g) for a,g in zip(v1, v2)]
|
|
p = np.array(p)
|
|
|
|
p = [p*g for p,g in zip(p, v2)]
|
|
p = np.array(p)
|
|
|
|
return p
|
|
|
|
|
|
def motion_axis(time, lin_accel, gravity, interval = 500):
|
|
"""
|
|
Returns the motion axis, which is the axis with the biggest variance
|
|
lin_accel -- Linear acceleration
|
|
gravity -- Gravity
|
|
|
|
Lin_accel and gravity should have equal length
|
|
"""
|
|
|
|
p = project(lin_accel, gravity)
|
|
|
|
#add time to vector p
|
|
p = np.array([time, p[:,0], p[:,1], p[:,2]]).T
|
|
|
|
start = 0
|
|
end = start + interval
|
|
end_time = p[:,0][-1] #last timestep
|
|
|
|
pca = PCA(n_components=1)
|
|
|
|
result = []
|
|
while start < end_time:
|
|
indices = np.where((p[:,0] >= start) & (p[:,0] < end))
|
|
Z = p[indices, 1:3][0]
|
|
Z[:,0] = signal.medfilt(Z[:,0],31)
|
|
Z[:,1] = signal.medfilt(Z[:,1],31)
|
|
|
|
pca.fit(Z)
|
|
|
|
x1 = pca.components_[0][0]
|
|
y1 = pca.components_[0][1]
|
|
|
|
result.append((end, x1, y1))
|
|
|
|
start += interval
|
|
end += interval
|
|
|
|
return np.array(result)
|
|
|
|
|
|
|
|
def angle_between(v1, v2):
|
|
|
|
l_a = np.linalg.norm(v1)
|
|
l_b = np.linalg.norm(v2)
|
|
cos_ab = np.dot(v1, v2 / (l_a * l_b))
|
|
angle = np.arccos(cos_ab) * 180/math.pi
|
|
|
|
return min([180 - angle, angle])
|
|
|
|
|
|
|
|
def rotate_data_fhws(data, data_t, rotation, rotation_t):
|
|
#Invert rotationmatrix
|
|
np.linalg.inv(rotation)
|
|
|
|
#Align rotation time according to data time
|
|
tmp = []
|
|
for t in data_t:
|
|
# Find indices of roation matrix that are earlier
|
|
#than the current time of the sensor value
|
|
ind = np.where(rotation_t <= t)[0]
|
|
|
|
#Use the last index
|
|
if len(ind) != 0:
|
|
tmp.append(ind[-1])
|
|
else:
|
|
tmp.append(0)
|
|
|
|
#Only use the values of the rotation matrix that are aligned with the sensor data
|
|
rotation = rotation[tmp]
|
|
|
|
# Multiply data with rotation matrix
|
|
rot_data = []
|
|
for i, row in enumerate(data):
|
|
row = np.append(row, 1)
|
|
rot_data.append(np.dot(rotation[i], row))
|
|
|
|
return np.array(rot_data)
|
|
|
|
|
|
|
|
|
|
def rotate_data_lukas(data, rotation):
|
|
#Invert rotationmatrix
|
|
np.linalg.inv(rotation)
|
|
|
|
rot_data = []
|
|
for i, row in enumerate(data):
|
|
row = np.append(row, 1)
|
|
rot_data.append(np.dot(rotation[i], row))
|
|
|
|
return np.array(rot_data)
|
|
|
|
|
|
|
|
|
|
def read_data(fname):
|
|
"""
|
|
Read the data out of the file provided by FHWS sensor reader app
|
|
"""
|
|
|
|
|
|
time = np.loadtxt(fname,
|
|
delimiter=";",
|
|
usecols=[0],
|
|
unpack=True)
|
|
|
|
|
|
f = open(fname, 'r')
|
|
|
|
lin_accel = []
|
|
gyros = []
|
|
rotations = []
|
|
gravity = []
|
|
start = time[0]
|
|
time = []
|
|
|
|
gyro_tmp = [0, 0, 0]
|
|
lin_accel_tmp = [0, 0, 0]
|
|
gravity_tmp = [0, 0, 9.81]
|
|
rotations_tmp = 16*[-1]
|
|
|
|
s = 0
|
|
for line in f:
|
|
line = line.split(";")
|
|
t = int(line[0]) - start
|
|
|
|
|
|
#Gyro-Data
|
|
if line[1] == "3":
|
|
gyro_tmp[0] = line[2]
|
|
gyro_tmp[1] = line[3]
|
|
gyro_tmp[2] = line[4]
|
|
|
|
|
|
#Linear Acceleration-Data
|
|
elif line[1] == "2":
|
|
lin_accel_tmp[0] = line[2]
|
|
lin_accel_tmp[1] = line[3]
|
|
lin_accel_tmp[2] = line[4]
|
|
|
|
|
|
#Gravity data
|
|
elif line[1] == "1":
|
|
gravity_tmp[0] = line[2]
|
|
gravity_tmp[1] = line[3]
|
|
gravity_tmp[2] = line[4]
|
|
|
|
|
|
#Rotation-Data
|
|
elif line[1] == "7":
|
|
for i in range(0,16):
|
|
rotations_tmp[i] = line[i+2]
|
|
|
|
|
|
if s != t:
|
|
gyros.append(gyro_tmp[:])
|
|
lin_accel.append(lin_accel_tmp[:])
|
|
gravity.append(gravity_tmp[:])
|
|
rotations.append(rotations_tmp[:])
|
|
time.append(t)
|
|
s = t
|
|
|
|
|
|
gyros = np.array(gyros, dtype=float)
|
|
lin_accel = np.array(lin_accel, dtype=float)
|
|
gravity = np.array(gravity, dtype=float)
|
|
rotations = np.array(rotations, dtype=float)
|
|
time = np.array(time, dtype = int)
|
|
|
|
#HACK
|
|
#In the first timestamps the rotation matrix is all zero, because
|
|
#no measurements are available yet.
|
|
#Avoid this by replacing these lines with the first measured
|
|
#rotation matrix
|
|
ind = np.where(rotations[:,0] == -1)[0]
|
|
if len(ind) != 0:
|
|
index = ind[-1] + 1
|
|
rotations[ind] = rotations[index]
|
|
|
|
#Reshape matrix
|
|
rotations = [row.reshape((4,4)) for row in rotations]
|
|
rotations = np.array(rotations)
|
|
|
|
|
|
return time, gyros, lin_accel, gravity, rotations
|
|
|
|
|
|
|
|
def detect_turns(time, signal, interval):
|
|
|
|
n_intervals = int(time[-1] / interval) + 1
|
|
result = []
|
|
|
|
for i in range(n_intervals):
|
|
start = i * interval
|
|
end = start + interval
|
|
|
|
tmp = integrate(start, end, zip(time, signal)) * 180.0/math.pi
|
|
result.append((end, tmp))
|
|
|
|
return np.array(result)
|
|
|
|
|
|
|
|
def integrate(time_from, time_to, signal):
|
|
"""Integrate signal from time_from to time_to. Signal should be two dimensional.
|
|
First dimension is the timestamp, second dimension is the signal value.
|
|
dt is the interval between two recordings
|
|
"""
|
|
sum = 0
|
|
last_time = 0
|
|
|
|
#go through signal
|
|
for value in signal:
|
|
#check if time is in the given timespan
|
|
if time_from <= value[0] < time_to:
|
|
#multiply value with dt and add it to the sum = integral
|
|
# sum += value[1] * dt
|
|
sum += value[1] * ((value[0] - last_time)/1000.)
|
|
last_time = value[0]
|
|
|
|
#sum is the integral over rad/s
|
|
return sum
|
|
|
|
|
|
|
|
def write_to_file(fname, turns, motion):
|
|
f = open(fname, 'w')
|
|
|
|
for index, t in enumerate(turns):
|
|
f.write(str(t[0]) + "," + str(t[1]) + "," + str(motion[index][1]) + "\n")
|
|
|
|
f.close()
|
|
|
|
|
|
|
|
def deg_to_rad(deg):
|
|
return deg * math.pi / 180.0
|
|
|
|
|
|
def rad_to_deg(rad):
|
|
return rad * 180.0 / math.pi
|
|
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument("fname_sensor",
|
|
help = "Gyroscope file")
|
|
|
|
parser.add_argument("fname_output",
|
|
help = "Output file, where timestamps and angle of heading will be saved")
|
|
|
|
parser.add_argument("--time",
|
|
help = "Time interval, over which gyroscope will be integrated (default=500ms)",
|
|
type=int)
|
|
|
|
parser.add_argument("--rad",
|
|
help = "Output angles in rad (default in degree)",
|
|
action = "store_true")
|
|
|
|
parser.add_argument("--file_format",
|
|
help = "Sensor data file format [fhws|lukas] (default: fhws)",
|
|
type = str)
|
|
|
|
parser.add_argument("--cosy",
|
|
help = "Coordinate system of the gyroscope data [world|device] (default: device). If given in device, the data will automatically be rotated in world coordinates.",
|
|
type = str)
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
#Choose between file format of sensor data and coordinate system
|
|
file_format = "fhws"
|
|
cosy = "device"
|
|
|
|
if args.file_format:
|
|
file_format = args.file_format
|
|
|
|
if args.cosy:
|
|
cosy = args.cosy
|
|
|
|
|
|
#My own data format
|
|
if file_format == "lukas":
|
|
delimiter = ","
|
|
time_cols = [40]
|
|
|
|
time = np.loadtxt(args.fname_sensor,
|
|
delimiter=delimiter,
|
|
usecols=time_cols,
|
|
skiprows = 1,
|
|
unpack=True)
|
|
|
|
if cosy == "device":
|
|
gyros_cols = [9, 10, 11]
|
|
lin_accel_cols = [6, 7, 8]
|
|
else:
|
|
gyros_cols = [34, 35,36]
|
|
lin_accel_cols = [37, 38, 39]
|
|
|
|
grav_cols = [3, 4, 5]
|
|
|
|
|
|
gyroX, gyroY, gyroZ = np.loadtxt(args.fname_sensor,
|
|
delimiter=delimiter,
|
|
usecols=gyros_cols,
|
|
skiprows = 1,
|
|
unpack=True)
|
|
|
|
rotation = np.loadtxt(args.fname_sensor,
|
|
delimiter = delimiter,
|
|
usecols=range(18,34),
|
|
skiprows=1,
|
|
unpack=True)
|
|
|
|
lin_accel_X, lin_accel_Y, lin_accel_Z = np.loadtxt(args.fname_sensor,
|
|
delimiter=delimiter,
|
|
usecols=lin_accel_cols,
|
|
skiprows=1,
|
|
unpack=True)
|
|
|
|
gravity_X, gravity_Y, gravity_Z = np.loadtxt(args.fname_sensor,
|
|
delimiter=delimiter,
|
|
usecols=grav_cols,
|
|
skiprows=1,
|
|
unpack=True)
|
|
|
|
rotation = rotation.T
|
|
rotation = [row.reshape((4,4)) for row in rotation]
|
|
# rotation = np.array(rotation).T
|
|
print rotation
|
|
|
|
gyro = np.array([gyroX, gyroY, gyroZ]).T
|
|
lin_accel = np.array([lin_accel_X, lin_accel_Y, lin_accel_Z]).T
|
|
gravity = np.array([gravity_X, gravity_Y, gravity_Z]).T
|
|
|
|
if cosy == "device":
|
|
world_gyro = rotate_data_lukas(gyro, rotation)
|
|
world_lin_accel = rotate_data_lukas(lin_accel, rotation)
|
|
else:
|
|
world_gyro = gyro
|
|
world_lin_accel = lin_accel
|
|
|
|
|
|
#FHWS file format
|
|
else:
|
|
time, gyro, lin_accel, gravity, rotation = read_data(args.fname_sensor)
|
|
|
|
if cosy == "device":
|
|
world_gyro = rotate_data_lukas(gyro, rotation)
|
|
world_lin_accel = rotate_data_lukas(lin_accel, rotation)
|
|
else:
|
|
print "Option 'fhws' in combination with 'world' not available"
|
|
return
|
|
|
|
|
|
gyroX = world_gyro[:,0]
|
|
gyroY = world_gyro[:,1]
|
|
gyroZ = world_gyro[:,2]
|
|
|
|
lin_accel_X = world_lin_accel[:,0]
|
|
lin_accel_Y = world_lin_accel[:,1]
|
|
lin_accel_Z = world_lin_accel[:,2]
|
|
|
|
|
|
#Parameters
|
|
#---------
|
|
|
|
time_interval = 500
|
|
|
|
if args.time:
|
|
time_interval = args.time
|
|
|
|
|
|
turns = detect_turns(time, gyroZ, time_interval)
|
|
motion = motion_axis(time, lin_accel, gravity, 500)
|
|
|
|
angles = []
|
|
for index, axis in enumerate(motion):
|
|
if index == 0:
|
|
angle = 0
|
|
else:
|
|
x_1 = motion[index-1][1]
|
|
y_1 = motion[index-1][2]
|
|
x_2 = axis[1]
|
|
y_2 = axis[2]
|
|
|
|
a = np.array([x_1, y_1])
|
|
b = np.array([x_2, y_2])
|
|
|
|
angle = angle_between(a,b)
|
|
angles.append((axis[0], angle))
|
|
|
|
|
|
np.set_printoptions(suppress=True)
|
|
turns = np.array(turns)
|
|
angles = np.array(angles)
|
|
|
|
|
|
if args.rad:
|
|
turns[:,1] = deg_to_rad(turns[:,1])
|
|
|
|
print "Sum of all angles: ", np.sum(turns[:,1])
|
|
write_to_file(args.fname_output, turns, angles)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
|
|
|
|
|
|
|