#ifndef FIRCOMPLEX_H #define FIRCOMPLEX_H #include #include #include "../../../Assertions.h" /** * FIR filter using complex convolution */ class FIRComplex { /** signal's sample-rate */ int sRate_hz; /** the created convolution kernel */ std::vector> kernel; /** incoming data */ std::vector> data; public: /** ctor with signal's sample-rate */ FIRComplex(const int sRate_hz) : sRate_hz(sRate_hz) { ; } /** get the internal kernel */ const std::vector> getKernel() const { return kernel; } /** configure as lowpass with the given cutoff and 2*size+1 */ void lowPass(const int cutOff_hz, const int size) { this->kernel = getLowpass(cutOff_hz, sRate_hz, size); } /** shift the constructed filter by the given hz-rate */ void shiftBy(const int shift_hz) { shiftKernel(shift_hz, sRate_hz); } /** filter the given incoming real data */ std::vector> append(const std::vector& newData) { // append to local buffer (as we need some history) //data.insert(data.end(), newData.begin(), newData.end()); for (const float f : newData) { data.push_back(std::complex(f, 0)); // real = value, imag = 0; } return processLocalBuffer(); } /** filter the given incoming complex data */ std::vector> append(const std::vector>& newData) { // append to local buffer (as we need some history) data.insert(data.end(), newData.begin(), newData.end()); return processLocalBuffer(); } /** filter the given incoming real value */ std::complex append(const float val) { data.push_back(std::complex(val, 0)); auto tmp = processLocalBuffer(); if (tmp.size() == 0) {return std::complex(NAN, NAN);} if (tmp.size() == 1) {return tmp[0];} throw Exception("FIRComplex:: detected invalid result"); } /** filter the given incoming real value */ std::complex append(const std::complex c) { data.push_back(c); auto tmp = processLocalBuffer(); if (tmp.size() == 0) {return std::complex(NAN, NAN);} if (tmp.size() == 1) {return tmp[0];} throw Exception("FIRComplex:: detected invalid result"); } void dumpKernel(const std::string& file, const std::string& varName) { std::ofstream out(file); out << "# name: " << varName << "\n"; out << "# type: complex matrix\n"; out << "# rows: " << kernel.size() << "\n"; out << "# columns: 1\n"; for (const std::complex c : kernel) { out << "(" << c.real() << "," << c.imag() << ")" << "\n"; } out.close(); } private: std::vector> processLocalBuffer() { // sanity check Assert::isNot0(kernel.size(), "FIRComplex:: kernel not yet configured!"); // number of processable elements (due to filter size) const int processable = data.size() - kernel.size() + 1 - kernel.size()/2; // nothing to-do? if (processable <= 0) {return std::vector>();} // result-vector std::vector> res; res.resize(processable); // fire convolve(data.data(), res.data(), processable); // drop processed elements from the local buffer data.erase(data.begin(), data.begin() + processable); // done return res; } template void convolve(const std::complex* src, T* dst, const size_t cnt) { const size_t ks = kernel.size(); for (size_t i = 0; i < cnt; ++i) { T t = T(); for (size_t j = 0; j < ks; ++j) { t += src[j+i] * kernel[j]; } if (t != t) {throw std::runtime_error("detected NaN");} dst[i] = t; } } // template void convolve(const float* src, T* dst, const size_t cnt) { // const size_t ks = kernel.size(); // for (size_t i = 0; i < cnt; ++i) { // T t = T(); // for (size_t j = 0; j < ks; ++j) { // t += std::complex(src[j+i], 0) * kernel[j]; // } // if (t != t) {throw std::runtime_error("detected NaN");} // dst[i] = t; // } // } /** get a value from the hamming window */ static double winHamming(const double t, const double size) { return 0.54 - 0.46 * std::cos(2 * M_PI * t / size); } /** frequency shift the kernel by multiplying with a frequency */ void shiftKernel(const int shift_hz, const int sRate_hz) { for (size_t i = 0; i < kernel.size(); ++i) { const float t = (float) i / (float) sRate_hz; const float real = std::cos(t * 2 * M_PI * shift_hz); const float imag = std::sin(t * 2 * M_PI * shift_hz); kernel[i] = kernel[i] * std::complex(real, imag); } } // https://dsp.stackexchange.com/questions/4693/fir-filter-gain /** normalize using the DC-part of the kernel */ static void normalizeDC(std::vector>& kernel) { std::complex sum; for (auto f : kernel) {sum += f;} for (auto& f : kernel) {f /= sum;} } // https://dsp.stackexchange.com/questions/4693/fir-filter-gain static void normalizeAC(std::vector>& kernel, const float freq) { throw std::runtime_error("TODO"); } /** build a lowpass filter kernel */ static std::vector> getLowpass(const int cutOff, const int sRate, const int n) { std::vector> kernel; for (int i = -n; i <= +n; ++i) { const double t = (double) i / (double) sRate; const double tmp = 2 * M_PI * cutOff * t; const double val = (tmp == 0) ? (1) : (std::sin(tmp) / tmp); const double win = winHamming(i+n, n*2); const double res = val * win;// * 0.5f; // why 0.5? if (res != res) {throw std::runtime_error("detected NaN");} kernel.push_back( std::complex(res, 0) ); } // important!!! normalize so the original frequencies stay at 0dB normalizeDC(kernel); // dc works for low-pass filter only as this one contains DC! return kernel; } }; #endif // FIRCOMPLEX_H