Commit 88e24d90 authored by Smail Bachir's avatar Smail Bachir Committed by Thomas Gambier

Add DPD code, with README and examples

parent 0c713e9e
# Python Pluto DPD # pyPlutoDPD
Pre-Distortion for Linearization of RF power amplifier in python using ADALM PLUTO.
## Overview
The pyPlutoDPD is a python implementation of Digital Pre-Distortion, a most used technique to linearize the distortions introiduced by the RF Power Amplifiers.
This has been developed under python by: </br>
- Smail Bachir, University of Poitiers, XLIM-Laboratory UMR-CNRS 7252
in collaboration with:
- Claude Duvanaud, University of Poitiers, XLIM-Laboratory UMR-CNRS 7252
- Jean-Marc Ouvrard, Rapid Space
- Thomas Gambier, Rapid Space
LMS Algorithm running with a polular ADALM-PlutoSDR has been implemneted.
<b> Pre-requises </b> : pyPlutoDPD requires only free dependances :
- Python with packages : numpy, adi, matplotlib, scipy
- Install PlutoSDR in Python : see https://pysdr.org/content/pluto.html
- Cheap board : Adalm PlutoSDR
<b> Setup </b> : pyPlutoDPD can be used with two configuations
1. Minimale configuration with only ADALM-Pluto SDR connected to himself (SMA wire bettween TX and RX). This allows to test the program and see the steps.
2. Configuration with DUT (Device Under Test) : Here, insert your RF-PA bettween TX and RX. In our case, we linearize a RF PA of .....
# Main program
### Required packages
```python
import numpy as np
import adi # API for Adalm-Pluto
import matplotlib.pyplot as plt
from scipy.io import loadmat, savemat
from scipy.signal import get_window
from scipy.fft import fft, fftshift
from scipy import signal
import time
start = time.time() # start time to measure the execution time
```
Load the used functions located in the ./commonFunctions.py
```python
from commonFunctions import * # XLIM used functions
```
## Transmission without DPD
### Transmit original samples
Load transmitted samples en .bin format
The preambule samples are used for synchronization
You can use "lte-5G/BW-10MHz with 61.44MHz of sampling rate" or "WLAN/BW-10MHz with 60MHz of sampling sate"
```python
# Read the frame
path = ""
osf = 3
filePreambule = "preambule_BW20osf" + str(osf) + "_single.bin"
fileRx = "lteBW20osf" + str(osf) + "_single.bin"
sample_rate = 61.44e6 # 61.44e6 46.08e6 30.72e6 23.04e6 11.52e6
#fileRx = "wlanBW20osf" + str(osf) + "_single.bin"
#sample_rate = 60e6
fp = open(path + filePreambule, 'rb')
preambule = np.fromfile(fp, dtype=np.complex64)
fp = open(path + fileRx, 'rb')
wave = np.fromfile(fp, dtype=np.complex64)
samples = np.ndarray(len(preambule) + len(wave), dtype=np.complex64)
samples[:len(preambule)] = preambule
samples[len(preambule):] = wave
del preambule, wave
txPower = np.round(10*np.log10(np.mean(np.abs(samples)**2)), 3)
print("TX avg-power = ", txPower, " dBm") # Print the txPower
num_samps = len(samples)
# The PlutoSDR expects samples between -2^14 and +2^14
coeff = 2**14
samples *= coeff
```
TX avg-power = -10.532 dBm
Connect the sdr pluto
```python
sdr = adi.Pluto("ip:192.168.2.1")
sdr.sample_rate = sample_rate
```
Config Tx and start transmitting
```python
freq = int(3650)
win_fft = 'flattop' # The used window for FFT ans spectrum plot
initialGain = int(0) # Here, the initial gain of TX is set to the minimal value for safety
sdr.tx_rf_bandwidth = int(sample_rate) # filter cutoff
sdr.tx_lo = freq*10**6
sdr.tx_hardwaregain_chan0 = initialGain #
sdr.tx_destroy_buffer() # Stop transmitting for safe
sdr.tx_cyclic_buffer = True # Enable cyclic buffers
safetyGain = float(0) # set your tx gain limitation according to your DUT
sdr.tx(samples) # start transmitting
```
Loop for tx_gain monitoring in real time
```python
valid = False
while (valid):
valid = input("1 to set txGain || 0 to compute DPD : ")
valid = bool(int(valid))
if valid == True:
value = input(f"enter new txGain (Actual txGain = {sdr.tx_hardwaregain_chan0}) : ")
gain = float(value)
if gain < safetyGain: ## Gain limitation for safety
sdr.tx_hardwaregain_chan0 = gain
else:
sdr.tx_hardwaregain_chan0 = safetyGain
```
### Receive the original samples
```python
sdr.rx_lo = sdr.tx_lo
sdr.rx_rf_bandwidth = sdr.tx_rf_bandwidth
sdr.rx_buffer_size = 3*num_samps # 3 times to capture one sequence
sdr.gain_control_mode_chan0 = 'manual'
sdr.rx_hardwaregain_chan0 = 8 # set the receive gain, but be careful not to saturate the ADC
```
Clear buffer just to be safe
```python
for i in range (0, 10):
raw_data = sdr.rx()
```
Receive samples
```python
rx_samples = sdr.rx()
avg_pwr = np.mean(np.abs(rx_samples/coeff)**2)
avg_pwr_dB = round(10*np.log10(avg_pwr),2)
print("check_rx = ", avg_pwr_dB, ": Best in -30 +/- 5. Else, adjust the rx_hardwaregain")
```
check_rx = -29.38 : Best in -30 +/- 5. Else, adjust the rx_hardwaregain
Synchronization and coarse phase correction using the preambule (zeroing AM/PM)
```python
[paIn, paOut] = corelation(samples, rx_samples, osf) # Synchronization using cross-corelation
```
```python
paOut = coarsePhaseCorrection(paIn, paOut, osf) # Coarse phase correction using the preambule (zeroing AM/PM)
paIn /= np.max(np.abs(paIn)) # scaling
paOut /= np.max(np.abs(paOut)) # scaling
```
Plot freq domain
```python
[pxxWithout, f] = toPlotSpectrum(paOut, sample_rate, win_fft, 20, "all")
plt.figure(10)
plt.plot((f + freq)/1e6, 10*np.log10(pxxWithout))
plt.grid()
plt.xlabel('Frequency (MHz)'); plt.ylabel('Spectrum')
plt.legend(['Original signal'])
plt.show()
```
![png](img/output_26_0.png)
Plot AM/AM and AM/PM
```python
amam_ampm_plot(paIn, paOut)
plt.show()
```
![png](img/output_28_0.png)
## DPD Computation
$$
{y_{dpd}}(n) = \displaystyle \sum_{k=0}^{K_a-1} \sum_{l=0}^{L_a-1} a_{k l} \, x(n-l) \cdot |x(n-l)|^{k} + \sum_{k=1}^{K_b} \sum_{l=0}^{L_b-1} \sum_{m=1}^{M_b} b_{k l m} \, x(n-l) \cdot |x(n-l-m)|^{k} + \sum_{k=1}^{K_c} \sum_{l=0}^{L_c-1} \sum_{m=1}^{M_c} c_{k l m} \, x(n-l) \cdot |x(n-l+m)|^{k}
$$
$x(n)$ : dpd input (complex envelope) <br />
$y_{dpd}(n)$ : dpd output (complex envelope)
```python
dpdModel = {'fs': sample_rate}
[Ka, La, Kb, Lb, Mb, Kc, Lc, Mc] = [7, 1, 0, 1, 1, 0, 1, 1] # set the dpd orders and memory depths
dpdModel['parameters'] = {'Ka':Ka, 'La':La, 'Kb':Kb, 'Lb':Lb, 'Mb':Mb, 'Kc':Kc, 'Lc':Lc, 'Mc':Mc};
gainATT = 1
dpdModel = identif_dpd_GMP(paIn, paOut/gainATT, dpdModel)
print("nmse of identification :", np.round(dpdModel['nmseTimedB'],2), " dB")
dpdOutput = simGMPopt(paIn, dpdModel) # generate DPD signal
dpdOutput = dpdOutput/np.max(np.abs(dpdOutput))
txPower = np.round(10*np.log10(np.mean(np.abs(dpdOutput)**2)), 3)
print("dpd tx-power = ", txPower, " dBm")
dpdOutput *= coeff # scaling for ADALM-Pluto
```
nmse of identification : -32.15 dB
dpd tx-power = -11.255 dBm
## Transmission with DPD
### Transmit DPD signal
```python
sdr.tx_destroy_buffer() # Stop transmitting for safe
sdr.tx_cyclic_buffer = True # Enable cyclic buffers
sdr.tx(dpdOutput) # start transmitting with DPD
valid = False
while (valid):
valid = input("1 to set txGain || 0 to compute DPD : ")
valid = bool(int(valid))
if valid == True:
value = input(f"enter new txGain (Actual txGain = {sdr.tx_hardwaregain_chan0}) : ")
gain = float(value)
if gain < safetyGain: ## Gain limitation for safety
sdr.tx_hardwaregain_chan0 = gain
else:
sdr.tx_hardwaregain_chan0 = safetyGain
# savemat('36dBm.mat', {'paInput':paIn, 'paOutput':paOut, 'dpdOutput':dpdOutput, 'dpdModel':dpdModel})
# np.savetxt('outfile.txt', rx_samples.view(complex))
```
### Receive with DPD signal
```python
rx_samples = sdr.rx()
avg_pwr = np.mean(np.abs(rx_samples/coeff)**2)
avg_pwr_dB = round(10*np.log10(avg_pwr),2)
print("check_rx = ", avg_pwr_dB, ": Best in -30 +/- 5. Else, adjust the rx_hardwaregain")
```
check_rx = -29.39 : Best in -30 +/- 5. Else, adjust the rx_hardwaregain
Synchronization of input and output
```python
[paIn, paLin] = corelation(samples, rx_samples, osf)
paLin = coarsePhaseCorrection(paIn, paLin, osf)
paIn /= np.max(np.abs(paIn)) # scaling
paLin /= np.max(np.abs(paLin)) # scaling
```
Plot Linearized AM/AM and AM/PM
```python
amam_ampm_plot(paIn, paLin)
plt.show()
print("nmse after linearization : ", np.round(nmse(paIn, paLin), 2), " dB")
```
![png](img/output_40_0.png)
nmse after linearization : -20.5 dB
```python
plt.figure()
plt.subplot(221)
plt.plot(np.real(paIn))
plt.plot(np.real(paLin))
plt.title("I/O in phase")
plt.xlabel("Samples")
plt.subplot(222)
plt.plot(np.imag(paIn))
plt.plot(np.imag(paLin))
plt.title("I/O in quadrature")
plt.xlabel("Samples")
plt.subplot(223)
plt.plot(np.real(paIn)-np.real(paLin))
plt.title("Error in phase (I)")
plt.xlabel("Samples")
plt.subplot(224); plt.plot(np.imag(paIn)-np.imag(paLin))
plt.title("Error in qudadrature (Q)")
plt.xlabel("Samples")
plt.show()
```
![png](img/output_41_0.png)
Compare spectrum with and without DPD
```python
[pxxWith, f] = toPlotSpectrum(paLin, sample_rate, win_fft, 20, "all")
plt.figure(10)
plt.plot((f + freq)/1e6, 10*np.log10(pxxWith))
plt.grid()
plt.xlabel('Frequency (MHz)'); plt.ylabel('Spectrum'); plt.legend(['With DPD'])
plt.show()
```
![png](img/output_43_0.png)
Stop transmitting
```python
sdr.tx_destroy_buffer()
```
Print the DPD coefficients
```python
end = time.time()
elapsed = end - start
print(f'Elapsed time : {elapsed:.2}ms')
```
Elapsed time : 3.5ms
```python
for i in dpdModel['coefGMP']:
print(np.round(i,3))
```
(0.911+0.003j)
(-0.163-0.035j)
(0.934+0.218j)
(-2.078-0.554j)
(2.636+0.797j)
(-1.658-0.524j)
(0.414+0.105j)
This project uses python code and Adalm Pluto board to characterize a radio PA and apply a Digital Pre Distorsion (DPD) on a signal to improve the linearity of the PA.
\ No newline at end of file
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import get_window
from scipy.fft import fft, fftshift
from scipy import signal
def nmse(xMes, xEst):
"""
Function to compute the NMSE in time domain
"""
error = xMes - xEst
nmse_dB = 10 * np.log10(np.sum(np.abs(error)**2) / np.sum(np.abs(xMes)**2))
return nmse_dB
######################################
def algoLMS(PPHI, out, start, nSample):
"""
Classical LMS Algorithm
gmpModel['coefGMP'] = np.linalg.inv((np.dot(np.conj(PPHI.T), PPHI))) @ np.conj(PPHI.T) @ out
"""
"""
LMS Algorithm with regularization
"""
PHI = PPHI[start:nSample, :]
U, S, V = np.linalg.svd(np.conj(PHI.T) @ PHI)
Splus = np.diag(1 / np.where(S > 1e-8, S, np.inf))
theta = (U @ Splus @ np.conj(U.T)) @ np.conj(PHI.T) @ out[start:nSample]
H = (U @ Splus @ np.conj(U.T)) @ np.conj(PHI.T)
return theta, H
########################################
def computeFFT(x, fs, win):
"""
Perform FFT on x
x : signal
fx : sampling rate
win : fft windows
returns
fftAbs : fft absolute value
freq : frequency array
"""
N = len(x)
fft_block_size = N
w = get_window(win, fft_block_size)
fftSig = fft(x * w) / fft_block_size
fftAbs = np.abs(fftSig)
freq = (np.arange(0, N) - N/2) * fs / fft_block_size
freq = fftshift(freq) # Shift zero frequency component to center of spectrum
return fftAbs, freq
#########################################
def toPlotSpectrum(x, fs, win, Nblocks, interval):
"""
To plot the spectrum
x : signal
fx : sampling rate
win : fft windows
Nbblocks : number of blocks to perform average spectrum
interval = "half" to spectrum in [0 fs/2] else spectrum in [-fs/2 fs/2]
returns
pxx : average fft absolute value
freq : frequency array
"""
# Spectrum computation (one or two sided)
N = len(x)
fft_block_size = int(2**(np.ceil(np.log2(N/Nblocks))-1))
window = get_window(win, fft_block_size)
fft_input = np.zeros(fft_block_size, dtype='complex128')
for n in range(1, Nblocks+1):
idx = slice((n-1)*fft_block_size, n*fft_block_size)
fft_block = fft(x[idx] * window) / fft_block_size
fft_input += (fft_block * np.conj(fft_block))
fft_input /= Nblocks
if interval == "half": # spectre entre [0 fs/2]
fft_input = fft_input[:fft_block_size//2]
pxx = np.abs(fft_input)
freq = np.arange(0, fs/2, fs/(2*fft_block_size))
else: # spectre entre [-fs/2 fs/2]
pxx = np.abs(fft_input)
pxx = fftshift(pxx)
freq = (np.arange(0, fft_block_size) - fft_block_size/2) * fs / fft_block_size
return pxx, freq
#################################################
def corelation(tx, rx, osf):
"""
Performs tx/rx synchronization and signal alignement
tx : transmitted signal
rx : received signal
osf : over-sampling factor
returns
txSig, rxSig : synchonized tx and rx signals
"""
preambule = 10*80 # 2 STF + 2 LTF + 1 SIG + 5 another
n = int(osf*preambule)
# Resample transmit waveform
rap = 16 # Another upSampling for best synchronization
tx = signal.resample(tx, len(tx) * rap)
rx = signal.resample(rx, len(rx) * rap)
nSamples = len(tx)
lag = xcorr(np.abs(rx[:nSamples]), np.abs(tx[:n*rap]))
# Data alignement and downSampling
start = 0
txSig = tx[start:len(tx)]
rxSig = rx[lag+start:lag + start + len(txSig)]
txSig = signal.resample(txSig, len(txSig) // rap)
rxSig = signal.resample(rxSig, len(rxSig) // rap)
return txSig, rxSig
#################################################
def xcorr(x,y):
"""
Perform Cross-Correlation on x and y
x : 1st signal
y : 2nd signal
returns
lags : lags of correlation
corr : coefficients of correlation
"""
corr = signal.correlate(x, y, mode="full")
lags = signal.correlation_lags(len(x), len(y), mode="full")
lag = lags[np.argmax(corr)]
return lag
#################################################
def coarsePhaseCorrection(x, y, osf):
## Coarse phase correction using the preambule (zeroing AM/PM)
preambule = 5*80 # 2 STF + 2 LTF + 1 SIG
n = int(osf*preambule)
nSample = len(x)
dephase = np.angle(np.conj(x[:n].T) @ y[:n])
y = y * np.exp(-1j * dephase)
return y
##################################################
def amam_ampm_plot(tx, ty):
"""
Perform AM/AM and AM/PM curves
ttx : transmitted signal
rx : received signal
"""
plt.figure()
plt.subplot(211)
plt.plot(np.abs(tx), np.abs(ty), '.')
plt.title('Dynamical AM/AM')
plt.xlabel('Normalized input')
plt.ylabel('Normalized output')
plt.subplot(212)
plt.plot(np.abs(tx), np.angle(np.conj(tx) * ty),'.')
plt.title('Dynamical AM/PM')
plt.xlabel('Normalized input')
plt.ylabel('Dephase (deg)')
###################################################
def identif_dpd_GMP(x, y, dpdModel):
dpdModel = estimGMPopt(y, x, dpdModel)
dpdModel['nmseTimedB'] = nmse(x, dpdModel['outEst'])
return dpdModel
#################################################
def estimGMPopt(x, y, gmpModel):
"""
To estimate GMP coefficients
x : reference signal
y : measured signal
gmpModel : object containing model parameters
returns
gmpModel : gmpModel with the estimated coefficients
"""
Ka = gmpModel['parameters']['Ka']
La = gmpModel['parameters']['La']
Kb = gmpModel['parameters']['Kb']
Lb = gmpModel['parameters']['Lb']
Mb = gmpModel['parameters']['Mb']
Kc = gmpModel['parameters']['Kc']
Lc = gmpModel['parameters']['Lc']
Mc = gmpModel['parameters']['Mc']
gmpModel['nbCoef'] = Ka * La + Kb * Lb * Mb + Kc * Lc * Mc
nSample = len(x)
PPHI = np.zeros((nSample, gmpModel['nbCoef']), dtype=np.complex128)
for l in range(La):
it = np.concatenate((np.zeros(l), x[:nSample-l]))
for k in range(Ka):
PPHI[:, (La * k) + l] = (np.abs(it))**k * it
for l in range(Lb):
it2 = np.concatenate((np.zeros(l), x[:nSample-l]))
for m in range(1, Mb + 1):
it1 = np.concatenate((np.zeros(l + m), x[:nSample-l-m]))
for k in range(1, Kb + 1):
PPHI[:, La * Ka + (Lb * Mb * (k - 1)) + (Mb * l) + (m-1)] = (np.abs(it1))**k * it2
for l in range(Lc):
it2 = np.concatenate((np.zeros(l), x[:nSample-l]))
for m in range(1, Mc + 1):
if l >= m:
it1 = np.concatenate((np.zeros(l - m), x[:nSample-l+m]))
else:
it1 = np.concatenate((np.zeros(m - l), x[m-l:nSample])) * 0
for k in range(1, Kc + 1):
PPHI[:, La * Ka + Lb * Kb * Mb + (Lc * Mc * (k - 1)) + (Mc * l) + (m-1)] = (np.abs(it1))**k * it2
start = 0
gmpModel['coefGMP'], gmpModel['H'] = algoLMS(PPHI, y, start, nSample)
gmpModel['outEst'] = PPHI @ gmpModel['coefGMP']
return gmpModel
######################################################
def simGMPopt(x, gmpModel):
"""
Simulates GMP model
x : input signal (stimulus)
gmpModel : object containing model parameters
returns
outEst : simulated output
"""
Ka = gmpModel['parameters']['Ka']
La = gmpModel['parameters']['La']
Kb = gmpModel['parameters']['Kb']
Lb = gmpModel['parameters']['Lb']
Mb = gmpModel['parameters']['Mb']
Kc = gmpModel['parameters']['Kc']
Lc = gmpModel['parameters']['Lc']
Mc = gmpModel['parameters']['Mc']
nSample = len(x)
PPHI = np.zeros((nSample, gmpModel['nbCoef']), dtype=np.complex128)
for l in range(La):
it = np.concatenate((np.zeros(l), x[:nSample-l]))
for k in range(Ka):
PPHI[:, (La * k) + l] = (np.abs(it))**k * it
for l in range(Lb):
it2 = np.concatenate((np.zeros(l), x[:nSample-l]))
for m in range(1, Mb + 1):
it1 = np.concatenate((np.zeros(l + m), x[:nSample-l-m]))
for k in range(1, Kb + 1):
PPHI[:, La * Ka + (Lb * Mb * (k - 1)) + (Mb * l) + (m-1)] = (np.abs(it1))**k * it2
for l in range(Lc):
it2 = np.concatenate((np.zeros(l), x[:nSample-l]))
for m in range(1, Mc + 1):
if l >= m:
it1 = np.concatenate((np.zeros(l - m), x[:nSample-l+m]))
else:
it1 = np.concatenate((np.zeros(m - l), x[m-l:nSample])) * 0
for k in range(1, Kc + 1):
PPHI[:, La * Ka + Lb * Kb * Mb + (Lc * Mc * (k - 1)) + (Mc * l) + (m-1)] = (np.abs(it1))**k * it2
outEst = np.zeros(nSample, dtype=np.complex128)
outEst = PPHI @ gmpModel['coefGMP']
return outEst
##################################################
This source diff could not be displayed because it is too large. You can view the blob instead.
#!/usr/bin/env python
# coding: utf-8
# # pyPlutoDPD
# Pre-Distortion for Linearization of RF power amplifier in python using ADALM PLUTO.
# ## Overview
# The pyPlutoDPD is a python implementation of Digital Pre-Distortion, a most used technique to linearize the distortions introiduced by the RF Power Amplifiers.
#
# This has been developed under python by: </br>
# - Smail Bachir, University of Poitiers, XLIM-Laboratory UMR-CNRS 7252
#
# in collaboration with:
# - Claude Duvanaud, University of Poitiers, XLIM-Laboratory UMR-CNRS 7252
# - Jean-Marc Ouvrard, Rapid Space
# - Thomas Gambier, Rapid Space
#
# LMS Algorithm running with a polular ADALM-PlutoSDR has been implemneted.
#
# <b> Pre-requises </b> : pyPlutoDPD requires only free dependances :
# - Python with packages : numpy, adi, matplotlib, scipy
# - Install PlutoSDR in Python : see https://pysdr.org/content/pluto.html
# - Cheap board : Adalm PlutoSDR
#
# <b> Setup </b> : pyPlutoDPD can be used with two configuations
# 1. Minimale configuration with only ADALM-Pluto SDR connected to himself (SMA wire bettween TX and RX). This allows to test the program and see the steps.
# 2. Configuration with DUT (Device Under Test) : Here, insert your RF-PA bettween TX and RX. In our case, we linearize a RF PA of .....
# # Main program
# ### Required packages
# In[47]:
import numpy as np
import adi # API for Adalm-Pluto
import matplotlib.pyplot as plt
from scipy.io import loadmat, savemat
from scipy.signal import get_window
from scipy.fft import fft, fftshift
from scipy import signal
import time
start = time.time() # start time to measure the execution time
# Load the used functions located in the ./commonFunctions.py
# In[48]:
from commonFunctions import * # XLIM used functions
# ## Transmission without DPD
# ### Transmit original samples
# Load transmitted samples en .bin format
# The preambule samples are used for synchronization
# You can use "lte-5G/BW-10MHz with 61.44MHz of sampling rate" or "WLAN/BW-10MHz with 60MHz of sampling sate"
# In[49]:
# Read the frame
path = ""
osf = 3
filePreambule = "preambule_BW20osf" + str(osf) + "_single.bin"
fileRx = "lteBW20osf" + str(osf) + "_single.bin"
sample_rate = 61.44e6 # 61.44e6 46.08e6 30.72e6 23.04e6 11.52e6
#fileRx = "wlanBW20osf" + str(osf) + "_single.bin"
#sample_rate = 60e6
fp = open(path + filePreambule, 'rb')
preambule = np.fromfile(fp, dtype=np.complex64)
fp = open(path + fileRx, 'rb')
wave = np.fromfile(fp, dtype=np.complex64)
samples = np.ndarray(len(preambule) + len(wave), dtype=np.complex64)
samples[:len(preambule)] = preambule
samples[len(preambule):] = wave
del preambule, wave
txPower = np.round(10*np.log10(np.mean(np.abs(samples)**2)), 3)
print("TX avg-power = ", txPower, " dBm") # Print the txPower
num_samps = len(samples)
# The PlutoSDR expects samples between -2^14 and +2^14
coeff = 2**14
samples *= coeff
# Connect the sdr pluto
# In[50]:
sdr = adi.Pluto("ip:192.168.2.1")
sdr.sample_rate = sample_rate
# Config Tx and start transmitting
# In[51]:
freq = int(3650)
win_fft = 'flattop' # The used window for FFT ans spectrum plot
initialGain = int(0) # Here, the initial gain of TX is set to the minimal value for safety
sdr.tx_rf_bandwidth = int(sample_rate) # filter cutoff
sdr.tx_lo = freq*10**6
sdr.tx_hardwaregain_chan0 = initialGain #
sdr.tx_destroy_buffer() # Stop transmitting for safe
sdr.tx_cyclic_buffer = True # Enable cyclic buffers
safetyGain = float(0) # set your tx gain limitation according to your DUT
sdr.tx(samples) # start transmitting
# Loop for tx_gain monitoring in real time
# In[52]:
valid = False
while (valid):
valid = input("1 to set txGain || 0 to compute DPD : ")
valid = bool(int(valid))
if valid == True:
value = input(f"enter new txGain (Actual txGain = {sdr.tx_hardwaregain_chan0}) : ")
gain = float(value)
if gain < safetyGain: ## Gain limitation for safety
sdr.tx_hardwaregain_chan0 = gain
else:
sdr.tx_hardwaregain_chan0 = safetyGain
# ### Receive the original samples
# In[53]:
sdr.rx_lo = sdr.tx_lo
sdr.rx_rf_bandwidth = sdr.tx_rf_bandwidth
sdr.rx_buffer_size = 3*num_samps # 3 times to capture one sequence
sdr.gain_control_mode_chan0 = 'manual'
sdr.rx_hardwaregain_chan0 = 8 # set the receive gain, but be careful not to saturate the ADC
# Clear buffer just to be safe
# In[54]:
for i in range (0, 10):
raw_data = sdr.rx()
# Receive samples
# In[55]:
rx_samples = sdr.rx()
avg_pwr = np.mean(np.abs(rx_samples/coeff)**2)
avg_pwr_dB = round(10*np.log10(avg_pwr),2)
print("check_rx = ", avg_pwr_dB, ": Best in -30 +/- 5. Else, adjust the rx_hardwaregain")
# Synchronization and coarse phase correction using the preambule (zeroing AM/PM)
# In[56]:
[paIn, paOut] = corelation(samples, rx_samples, osf) # Synchronization using cross-corelation
# In[57]:
paOut = coarsePhaseCorrection(paIn, paOut, osf) # Coarse phase correction using the preambule (zeroing AM/PM)
paIn /= np.max(np.abs(paIn)) # scaling
paOut /= np.max(np.abs(paOut)) # scaling
# Plot freq domain
# In[58]:
[pxxWithout, f] = toPlotSpectrum(paOut, sample_rate, win_fft, 20, "all")
plt.figure(10)
plt.plot((f + freq)/1e6, 10*np.log10(pxxWithout))
plt.grid()
plt.xlabel('Frequency (MHz)'); plt.ylabel('Spectrum')
plt.legend(['Original signal'])
plt.show()
# Plot AM/AM and AM/PM
# In[59]:
amam_ampm_plot(paIn, paOut)
plt.show()
# ## DPD Computation
# $$
# {y_{dpd}}(n) = \displaystyle \sum_{k=0}^{K_a-1} \sum_{l=0}^{L_a-1} a_{k l} \, x(n-l) \cdot |x(n-l)|^{k} + \sum_{k=1}^{K_b} \sum_{l=0}^{L_b-1} \sum_{m=1}^{M_b} b_{k l m} \, x(n-l) \cdot |x(n-l-m)|^{k} + \sum_{k=1}^{K_c} \sum_{l=0}^{L_c-1} \sum_{m=1}^{M_c} c_{k l m} \, x(n-l) \cdot |x(n-l+m)|^{k}
# $$
#
# $x(n)$ : dpd input (complex envelope) <br />
# $y_{dpd}(n)$ : dpd output (complex envelope)
# In[37]:
dpdModel = {'fs': sample_rate}
[Ka, La, Kb, Lb, Mb, Kc, Lc, Mc] = [7, 1, 0, 1, 1, 0, 1, 1] # set the dpd orders and memory depths
dpdModel['parameters'] = {'Ka':Ka, 'La':La, 'Kb':Kb, 'Lb':Lb, 'Mb':Mb, 'Kc':Kc, 'Lc':Lc, 'Mc':Mc};
gainATT = 1
dpdModel = identif_dpd_GMP(paIn, paOut/gainATT, dpdModel)
print("nmse of identification :", np.round(dpdModel['nmseTimedB'],2), " dB")
dpdOutput = simGMPopt(paIn, dpdModel) # generate DPD signal
dpdOutput = dpdOutput/np.max(np.abs(dpdOutput))
txPower = np.round(10*np.log10(np.mean(np.abs(dpdOutput)**2)), 3)
print("dpd tx-power = ", txPower, " dBm")
dpdOutput *= coeff # scaling for ADALM-Pluto
# ## Transmission with DPD
# ### Transmit DPD signal
# In[38]:
sdr.tx_destroy_buffer() # Stop transmitting for safe
sdr.tx_cyclic_buffer = True # Enable cyclic buffers
sdr.tx(dpdOutput) # start transmitting with DPD
valid = False
while (valid):
valid = input("1 to set txGain || 0 to compute DPD : ")
valid = bool(int(valid))
if valid == True:
value = input(f"enter new txGain (Actual txGain = {sdr.tx_hardwaregain_chan0}) : ")
gain = float(value)
if gain < safetyGain: ## Gain limitation for safety
sdr.tx_hardwaregain_chan0 = gain
else:
sdr.tx_hardwaregain_chan0 = safetyGain
# savemat('36dBm.mat', {'paInput':paIn, 'paOutput':paOut, 'dpdOutput':dpdOutput, 'dpdModel':dpdModel})
# np.savetxt('outfile.txt', rx_samples.view(complex))
# ### Receive with DPD signal
# In[39]:
rx_samples = sdr.rx()
avg_pwr = np.mean(np.abs(rx_samples/coeff)**2)
avg_pwr_dB = round(10*np.log10(avg_pwr),2)
print("check_rx = ", avg_pwr_dB, ": Best in -30 +/- 5. Else, adjust the rx_hardwaregain")
# Synchronization of input and output
# In[40]:
[paIn, paLin] = corelation(samples, rx_samples, osf)
paLin = coarsePhaseCorrection(paIn, paLin, osf)
paIn /= np.max(np.abs(paIn)) # scaling
paLin /= np.max(np.abs(paLin)) # scaling
# Plot Linearized AM/AM and AM/PM
# In[41]:
amam_ampm_plot(paIn, paLin)
plt.show()
print("nmse after linearization : ", np.round(nmse(paIn, paLin), 2), " dB")
# In[42]:
plt.figure()
plt.subplot(221)
plt.plot(np.real(paIn))
plt.plot(np.real(paLin))
plt.title("I/O in phase")
plt.xlabel("Samples")
plt.subplot(222)
plt.plot(np.imag(paIn))
plt.plot(np.imag(paLin))
plt.title("I/O in quadrature")
plt.xlabel("Samples")
plt.subplot(223)
plt.plot(np.real(paIn)-np.real(paLin))
plt.title("Error in phase (I)")
plt.xlabel("Samples")
plt.subplot(224); plt.plot(np.imag(paIn)-np.imag(paLin))
plt.title("Error in qudadrature (Q)")
plt.xlabel("Samples")
plt.show()
# Compare spectrum with and without DPD
# In[43]:
[pxxWith, f] = toPlotSpectrum(paLin, sample_rate, win_fft, 20, "all")
plt.figure(10)
plt.plot((f + freq)/1e6, 10*np.log10(pxxWith))
plt.grid()
plt.xlabel('Frequency (MHz)'); plt.ylabel('Spectrum'); plt.legend(['With DPD'])
plt.show()
# Stop transmitting
# In[44]:
sdr.tx_destroy_buffer()
# Print the DPD coefficients
# In[45]:
end = time.time()
elapsed = end - start
print(f'Elapsed time : {elapsed:.2}ms')
# In[46]:
for i in dpdModel['coefGMP']:
print(np.round(i,3))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment