|
@@ -40,35 +40,94 @@ class CICFilter:
|
|
|
|
|
|
|
|
return output_signal
|
|
return output_signal
|
|
|
|
|
|
|
|
- def get_filter_freq_resp(self, num_points=1024):
|
|
|
|
|
|
|
+ def cascade_multiple_cic_filters(self, input_signal, num_filters):
|
|
|
|
|
+ output_signal = input_signal
|
|
|
|
|
+ for _ in range(num_filters):
|
|
|
|
|
+ output_signal = self.filter(output_signal)
|
|
|
|
|
+ return output_signal
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def get_filter_freq_resp(self, num_points=1024, is_cascade=False, num_filters=1):
|
|
|
# Подаём дельта-функцию на вход фильтру
|
|
# Подаём дельта-функцию на вход фильтру
|
|
|
impulse = np.zeros(num_points)
|
|
impulse = np.zeros(num_points)
|
|
|
impulse[0] = 1.0 # Дельта-функция
|
|
impulse[0] = 1.0 # Дельта-функция
|
|
|
- output_signal = self.filter(impulse)
|
|
|
|
|
|
|
+ if is_cascade:
|
|
|
|
|
+ output_signal = self.cascade_multiple_cic_filters(impulse, num_filters)
|
|
|
|
|
+ else:
|
|
|
|
|
+ output_signal = self.filter(impulse)
|
|
|
# Вычисляем частотную характеристику с помощью БПФ
|
|
# Вычисляем частотную характеристику с помощью БПФ
|
|
|
freq_response = np.fft.fft(output_signal, num_points)
|
|
freq_response = np.fft.fft(output_signal, num_points)
|
|
|
# Нормализуем частотную характеристику
|
|
# Нормализуем частотную характеристику
|
|
|
freq_response /= np.max(np.abs(freq_response))
|
|
freq_response /= np.max(np.abs(freq_response))
|
|
|
# Создаем массив частот
|
|
# Создаем массив частот
|
|
|
- freq = np.fft.fftfreq(num_points, d=1/(50e6/self.decimation_factor))
|
|
|
|
|
|
|
+ if is_cascade:
|
|
|
|
|
+ freq = np.fft.fftfreq(num_points, d=1/(50e6/self.decimation_factor * num_filters))
|
|
|
|
|
+ else:
|
|
|
|
|
+ freq = np.fft.fftfreq(num_points, d=1/(50e6/self.decimation_factor))
|
|
|
# Возвращаем частоты и амплитуду частотной характеристики
|
|
# Возвращаем частоты и амплитуду частотной характеристики
|
|
|
return freq, np.abs(freq_response)
|
|
return freq, np.abs(freq_response)
|
|
|
|
|
|
|
|
# # Calculate coefficients for the FIR filter to compensate the CIC frequency response
|
|
# # Calculate coefficients for the FIR filter to compensate the CIC frequency response
|
|
|
# def get_compensation_coefficients(self, cic_freq_response, target_freq_response):
|
|
# def get_compensation_coefficients(self, cic_freq_response, target_freq_response):
|
|
|
|
|
|
|
|
|
|
+ def calculate_fir_compensator_coeffs(self, cic_freq_response, order, fs=50e6, compensation_factor=0.95):
|
|
|
|
|
+ """
|
|
|
|
|
+ Synthesizes FIR compensator coefficients for the CIC filter
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ cic_freq_response: frequency response of the CIC filter
|
|
|
|
|
+ order: order of the compensator filter
|
|
|
|
|
+ fs: sampling frequency in Hz
|
|
|
|
|
+ compensation_factor: degree of compensation (0-1)
|
|
|
|
|
+ """
|
|
|
|
|
+ # Create ideal frequency response for the compensator filter
|
|
|
|
|
+ half_len = len(cic_freq_response) // 2
|
|
|
|
|
+ desired_response = np.ones(len(cic_freq_response), dtype=complex)
|
|
|
|
|
+
|
|
|
|
|
+ # Define passband edge up to which compensation is needed (approximately 0.4*fs/R)
|
|
|
|
|
+ passband_edge = int(0.4 * half_len)
|
|
|
|
|
+
|
|
|
|
|
+ # Apply compensation only in the passband
|
|
|
|
|
+ min_magnitude = 1e-12
|
|
|
|
|
+ for i in range(passband_edge):
|
|
|
|
|
+ if np.abs(cic_freq_response[i]) > min_magnitude:
|
|
|
|
|
+ desired_response[i] = (1.0 / cic_freq_response[i]) ** compensation_factor
|
|
|
|
|
+ else:
|
|
|
|
|
+ desired_response[i] = (1.0 / min_magnitude) ** compensation_factor
|
|
|
|
|
+
|
|
|
|
|
+ # Symmetry for real filter
|
|
|
|
|
+ for i in range(1, passband_edge):
|
|
|
|
|
+ desired_response[len(cic_freq_response) - i] = np.conj(desired_response[i])
|
|
|
|
|
+
|
|
|
|
|
+ # Get impulse response
|
|
|
|
|
+ imp_resp = np.real(np.fft.ifft(desired_response))
|
|
|
|
|
+
|
|
|
|
|
+ # Center and trim to the required length
|
|
|
|
|
+ imp_resp = np.roll(imp_resp, -len(imp_resp) // 2)[:order + 1]
|
|
|
|
|
+
|
|
|
|
|
+ # Apply window function
|
|
|
|
|
+ window = np.hamming(len(imp_resp))
|
|
|
|
|
+ imp_resp = imp_resp * window
|
|
|
|
|
+
|
|
|
|
|
+ # Normalize for unity gain
|
|
|
|
|
+ imp_resp /= np.sum(imp_resp)
|
|
|
|
|
+
|
|
|
|
|
+ return imp_resp
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
|
|
|
def main():
|
|
def main():
|
|
|
- decimation_factor = 4
|
|
|
|
|
- delay = 8
|
|
|
|
|
- order = 6
|
|
|
|
|
- num_samples = 100000
|
|
|
|
|
|
|
+ decimation_factor = 10
|
|
|
|
|
+ delay = 2*decimation_factor
|
|
|
|
|
+ order = 4
|
|
|
|
|
+ num_samples = 100
|
|
|
# Input signal - multiplication of a 3 MHz sine and 6 MHz sine
|
|
# Input signal - multiplication of a 3 MHz sine and 6 MHz sine
|
|
|
fs = 50e6 # Sampling frequency
|
|
fs = 50e6 # Sampling frequency
|
|
|
f1 = 7e6 # Frequency of first sine wave
|
|
f1 = 7e6 # Frequency of first sine wave
|
|
|
f2 = 3e6 # Frequency of second sine wave
|
|
f2 = 3e6 # Frequency of second sine wave
|
|
|
t = np.arange(num_samples) / fs
|
|
t = np.arange(num_samples) / fs
|
|
|
- input_signal = np.sin(2 * np.pi * f1 * t) * np.sin(2 * np.pi * f2 * t)
|
|
|
|
|
|
|
+ input_signal = (np.sin(2 * np.pi * f1 * t) * np.sin(2 * np.pi * f2 * t))
|
|
|
# Plot the input signal in the frequency domain
|
|
# Plot the input signal in the frequency domain
|
|
|
plt.figure(figsize=(10, 6))
|
|
plt.figure(figsize=(10, 6))
|
|
|
plt.magnitude_spectrum(input_signal, Fs=fs, scale='dB', color='C0')
|
|
plt.magnitude_spectrum(input_signal, Fs=fs, scale='dB', color='C0')
|
|
@@ -81,7 +140,7 @@ def main():
|
|
|
# Create CIC filter instance
|
|
# Create CIC filter instance
|
|
|
cic_filter = CICFilter(decimation_factor, delay, order)
|
|
cic_filter = CICFilter(decimation_factor, delay, order)
|
|
|
# Filter the input signal
|
|
# Filter the input signal
|
|
|
- output_signal = cic_filter.filter(input_signal)
|
|
|
|
|
|
|
+ output_signal = cic_filter.cascade_multiple_cic_filters(input_signal, 1)
|
|
|
# Plot the output signal in the frequency domain
|
|
# Plot the output signal in the frequency domain
|
|
|
plt.magnitude_spectrum(output_signal, Fs=fs/decimation_factor, scale='dB', color='C1')
|
|
plt.magnitude_spectrum(output_signal, Fs=fs/decimation_factor, scale='dB', color='C1')
|
|
|
plt.title('Output Signal in Frequency Domain')
|
|
plt.title('Output Signal in Frequency Domain')
|
|
@@ -91,7 +150,7 @@ def main():
|
|
|
plt.tight_layout()
|
|
plt.tight_layout()
|
|
|
plt.show()
|
|
plt.show()
|
|
|
# Get frequency response of the CIC filter
|
|
# Get frequency response of the CIC filter
|
|
|
- freq, freq_response = cic_filter.get_filter_freq_resp()
|
|
|
|
|
|
|
+ freq, freq_response = cic_filter.get_filter_freq_resp(is_cascade=False, num_filters=1)
|
|
|
# Plot the frequency response
|
|
# Plot the frequency response
|
|
|
plt.figure(figsize=(10, 6))
|
|
plt.figure(figsize=(10, 6))
|
|
|
# Normalize frequency axis to 1
|
|
# Normalize frequency axis to 1
|
|
@@ -104,6 +163,32 @@ def main():
|
|
|
plt.legend()
|
|
plt.legend()
|
|
|
plt.tight_layout()
|
|
plt.tight_layout()
|
|
|
plt.show()
|
|
plt.show()
|
|
|
|
|
+
|
|
|
|
|
+ # # Calculate FIR compensator coefficients
|
|
|
|
|
+ # fir_coeffs = cic_filter.calculate_fir_compensator_coeffs(freq_response, 64, fs=fs, compensation_factor=0.95)
|
|
|
|
|
+
|
|
|
|
|
+ # fir_freq_response = np.fft.fft(fir_coeffs, len(freq_response))
|
|
|
|
|
+ # plt.figure(figsize=(10, 6))
|
|
|
|
|
+ # plt.plot(freq_normalized, 20 * np.log10(np.abs(fir_freq_response[:len(freq)//2])), label='FIR Compensator Frequency Response (Normalized)')
|
|
|
|
|
+ # plt.title('FIR Compensator Frequency Response')
|
|
|
|
|
+ # plt.xlabel('Frequncy [Hz]')
|
|
|
|
|
+ # plt.ylabel('Magnitugde [dB]')
|
|
|
|
|
+ # plt.grid()
|
|
|
|
|
+ # plt.tight_layout()
|
|
|
|
|
+ # plt.show()
|
|
|
|
|
+
|
|
|
|
|
+ # # Final Response
|
|
|
|
|
+ # final_response = fir_freq_response * freq_response
|
|
|
|
|
+ # plt.figure(figsize=(10, 6))
|
|
|
|
|
+ # plt.plot(freq_normalized, 20 * np.log10(np.abs(final_response[:len(freq)//2])), label='Final Response (CIC + FIR Compensator)')
|
|
|
|
|
+
|
|
|
|
|
+ # plt.title('Final Response of CIC Filter with FIR Compensator')
|
|
|
|
|
+ # plt.xlabel('Frequency [Hz]')
|
|
|
|
|
+ # plt.ylabel('Magnitude [dB]')
|
|
|
|
|
+ # plt.grid()
|
|
|
|
|
+ # plt.legend()
|
|
|
|
|
+ # plt.tight_layout()
|
|
|
|
|
+ # plt.show()
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
main()
|