Scheduling and Pipelining/Retiming

When scheduling cyclically (modulo) there is implicit pipelining/retiming taking place. B-ASIC can easily be used to showcase this.

import matplotlib.pyplot as plt
import numpy as np
from scipy import signal

from b_asic.core_operations import Addition, ConstantMultiplication
from b_asic.schedule import Schedule
from b_asic.scheduler import ALAPScheduler
from b_asic.sfg_generators import direct_form_1_iir
from b_asic.signal_generator import Impulse
from b_asic.simulation import Simulation

Design a simple direct form IIR low-pass filter.

N = 3
Wc = 0.2
b, a = signal.butter(N, Wc, btype="lowpass", output="ba")

Generate the corresponding signal-flow-graph (SFG).

sfg = direct_form_1_iir(b, a)
sfg
%3 in0 in0 in0.0 in0->in0.0 cmul0 cmul0 in0.0->cmul0 t3 t3 in0.0->t3 add0 add0 cmul0->add0 1 t3.0 t3->t3.0 out0 out0 add2.0 add2.0->out0 t0 t0 add2.0->t0 add2 add2 add2->add2.0 add0->add2 0 add1 add1 add1->add0 0 add3 add3 add3->add2 1 t0.0 t0->t0.0 cmul1 cmul1 t0.0->cmul1 t1 t1 t0.0->t1 cmul1->add3 1 t1.0 t1->t1.0 cmul2 cmul2 t1.0->cmul2 t2 t2 t1.0->t2 add4 add4 cmul2->add4 1 cmul3 cmul3 t2->cmul3 cmul3->add4 0 add4->add3 0 add5 add5 add5->add1 0 cmul4 cmul4 cmul4->add1 1 t3.0->cmul4 t4 t4 t3.0->t4 t4.0 t4->t4.0 cmul5 cmul5 t4.0->cmul5 t5 t5 t4.0->t5 cmul5->add5 1 cmul6 cmul6 t5->cmul6 cmul6->add5 0


Set latencies and execution times of the operations.

sfg.set_latency_of_type(Addition, 1)
sfg.set_latency_of_type(ConstantMultiplication, 3)
sfg.set_execution_time_of_type(Addition, 1)
sfg.set_execution_time_of_type(ConstantMultiplication, 1)

Print the critical path Tcp and the iteration period bound Tmin.

T_cp = sfg.critical_path_time()
print("Tcp:", T_cp)
T_min = sfg.iteration_period_bound()
print("Tmin:", T_min)
Tcp: 7
Tmin: 5

Create an ALAP schedule

schedule = Schedule(sfg, scheduler=ALAPScheduler(), cyclic=True)
schedule.show()
scheduling pipelining retiming

Move some operations “over the edge” in order to reach Tcp = Tmin.

schedule.move_operation('out0', 2)
schedule.move_operation('add2', 2)
schedule.move_operation('add0', 2)
schedule.move_operation('add3', 2)
schedule.set_schedule_time(5)
schedule.show()
scheduling pipelining retiming

Print the new critical path Tcp that is now equal to Tmin.

T_cp = schedule.sfg.critical_path_time()
print("Tcp:", T_cp)
T_min = schedule.sfg.iteration_period_bound()
print("Tmin:", T_min)
Tcp: 5
Tmin: 5

Show the reconstructed SFG that is now pipelined/retimed compared to the original.

schedule.sfg
%3 in0 in0 in0.0 in0->in0.0 cmul0 cmul0 in0.0->cmul0 t3 t3 in0.0->t3 t12 t12 cmul0->t12 t3.0 t3->t3.0 out0 out0 add2.0 add2.0->out0 cmul1 cmul1 add2.0->cmul1 t0 t0 add2.0->t0 add2 add2 add2->add2.0 add0 add0 t12->add0 1 add0->add2 0 t11 t11 t11->add0 0 add3 add3 add3->add2 1 t10 t10 cmul1->t10 t0.0 t0->t0.0 cmul2 cmul2 t0.0->cmul2 t2 t2 t0.0->t2 add4 add4 cmul2->add4 1 cmul3 cmul3 t2->cmul3 cmul3->add4 0 t9 t9 add4->t9 t9->add3 0 t10->add3 1 add1 add1 add1->t11 add5 add5 add5->add1 0 cmul4 cmul4 cmul4->add1 1 t3.0->cmul4 t5 t5 t3.0->t5 t5.0 t5->t5.0 cmul5 cmul5 t5.0->cmul5 t8 t8 t5.0->t8 cmul5->add5 1 cmul6 cmul6 t8->cmul6 cmul6->add5 0


Simulate the impulse response of the original and reconstructed SFGs. Plot the frequency responses of the original filter, the original SFG and the reconstructed SFG to verify that the schedule is valid.

sim1 = Simulation(sfg, [Impulse()])
sim1.run_for(1000)

sim2 = Simulation(schedule.sfg, [Impulse()])
sim2.run_for(1000)

w, h = signal.freqz(b, a)

# Plot 1: Original filter
spectrum_0 = 20 * np.log10(np.abs(h))
plt.figure()
plt.plot(w / np.pi, spectrum_0)
plt.title("Original filter")
plt.xlabel("Normalized frequency (x pi rad/sample)")
plt.ylabel("Magnitude (dB)")
plt.grid(True)
plt.show()

# Plot 2: Simulated SFG
spectrum_1 = 20 * np.log10(np.abs(signal.freqz(sim1.results['0'])[1]))
plt.figure()
plt.plot(w / np.pi, spectrum_1)
plt.title("Simulated SFG")
plt.xlabel("Normalized frequency (x pi rad/sample)")
plt.ylabel("Magnitude (dB)")
plt.grid(True)
plt.show()

# Plot 3: Recreated SFG
spectrum_2 = 20 * np.log10(np.abs(signal.freqz(sim2.results['0'])[1]))
plt.figure()
plt.plot(w / np.pi, spectrum_2)
plt.title("Pipelined/retimed SFG")
plt.xlabel("Normalized frequency (x pi rad/sample)")
plt.ylabel("Magnitude (dB)")
plt.grid(True)
plt.show()
  • Original filter
  • Simulated SFG
  • Pipelined/retimed SFG

Total running time of the script: (0 minutes 1.336 seconds)

Gallery generated by Sphinx-Gallery