First DSP Attempts

Well, since my last post on detecting calls of attention using my microphone, I have been paying attention to DSP since it seems to be one cool topic to spend 3 months on. So, I decided to begin reading the dspguide and found some cool stuff which I decided to tinker with. After reading about convolution, I decided to implement some basic filters that would help me amplify, add echoes and so on. First, implementations of convolution, the input side algorithm and the output side algorithm:

The input side algorithm:

def convolute_inside(impulse_response):
        global output_signal
        global input_signal
        len_out_signal = len(input_signal)+len(impulse_response)-1
        output_signal = [0 for x in xrange(len_out_signal)]
        for i in xrange(len(input_signal)):
                for j in xrange(len(impulse_response)):
                        output_signal[i+j] = output_signal[i+j] + input_signal[i]*impulse_response[j]
 

The output side algorithm:

def convolute_outside(impulse_response):
        global output_signal
        global input_signal
        len_out_signal = len(input_signal)+len(impulse_response)-1
        for i in xrange(len_out_signal):
                output_signal.append(0)
        for i in xrange(len_out_signal):
                for j in xrange(len(impulse_response)):
                        #print i, j
                        if not (i-j)<0 and not (i-j)>len(input_signal)-1:
                                output_signal[i] += impulse_response[j] * input_signal[i-j]

 

Next, using appropriate filters, we can add echoes and amplify the input signal. So, we first need to read in a wav file. Using my previous script, that should be simple:
Reading a wav file:

def readWavFile(input):
        inFile = wave.open(input, "rb")
        sample_rate = inFile.getframerate()
        total_samples = inFile.getnframes()
        vals = inFile.readframes(total_samples)
        inFile.close()
        return struct.unpack("%dh"%(total_samples), vals)
 

This next function adds an echo to the input signal:
Add an echo:

def add_echo():
        ‘Amounts to scaling and shifting the delta function and then adding it to a delta function’
       
        #considering the intensity of the echo to be 60% of that of the original signal and delayed by 1000 samples:
       
        impulse_response = [0 for x in xrange(1003)] #shifted and scaled delta function + delta function
        impulse_response[0] = 1
        impulse_response[-1] = 0.6
        convolute_inside(impulse_response)

 

The above function considers that an echo occurs a 1000 samples after the current one and with an intensity 6 tenths of the original signal. Since testing this becomes a serious pain, I decided to test my echo function using a different input signal and impulse response (echo is 6/10 of the original intensity and 4 samples away).

def add_new_echo():
        ‘Amounts to scaling and shifting the delta function and then adding it to a delta function’
       
        #considering the intensity of the echo to be 60% of that of the original signal and delayed by 4 samples:
       
        impulse_response = [1,0,0,0,0.6] #shifted and scaled delta function + delta function
        convolute_inside(impulse_response)

input_signal = [1,2,3,4]
#print input_signal
add_new_echo()
print output_signal
 

The output of this is:

mouse-den% python convolution.py
[1, 2, 3, 4, 0.59999999999999998, 1.2, 1.7999999999999998, 2.3999999999999999]

The impulse response for amplifying a signal would be a scaled delta function, so:

def amplify():
        ‘Make the impulse function a scaled delta function’
       
        impulse_response = [2]
        convolute_inside(impulse_response)
 

The code to write to a wave file should be pretty straightforward:

def writeWavFile(out):
        outStream = wave.open(out, "wb")
        outStream.setnchannels(1)
        outStream.setsampwidth(2)
        outStream.setframerate(44100)
        data = ""
        for i in xrange(len(output_signal)):
                data += struct.pack(‘h’, output_signal[i])
        outStream.writeframes(data)
        outStream.close()

So, here is the test wave file: test.wav

And the amplified wav file: output1.wav

Now, to the part where we try to figure out if a signal contains another signal.

Turns out, to do that, you just need to obtain the cross-correlation of the input signal and the waveform we already have.

So, the code to do this for a signal that goes like [1,2,3,4] is:

def cross_correlate(input_file=""):
        ‘Cross correlate, obtain the graph and find the peak’
       
        global input_signal
        global output_signal
        #input_signal = record() #this procedure reads in the signal we need to find
        input_signal = [1,2,3,4,11,0,0,1,10,5,2]
        impulse_response = [1,2,3,4] #read in our signal
        impulse_response.reverse()
        convolute_outside(impulse_response)
        print output_signal
 

This results in a signal that looks like:

[4, 11, 20, 30, 64, 44, 26, 15, 43, 52, 44, 26, 9, 2]

So, I had to figure out an algorithm to detect the peak and I found one on stackoverflow.com and it goes like this:

for i in xrange(len(output_signal)-1):
                travel = output_signal[i+1] – output_signal[i]
                rise = output_signal[len(output_signal)-1] – output_signal[0]
               
                if (travel/rise) > 1:
                        #print output_signal[i]
                        peak_options[output_signal[i]]=i

        return peak_options[max(peak_options.keys())]   
 

In other news, I am going to work in Prof. Kihara’s bio-informatics laboratory this fall and I hope it works out.

3 responses to “First DSP Attempts”

  1. Robert

    It’s slightly insane to do the DSP in pure python. Numpy & Scipy have prebuilt optimized routines for most of these things.

  2. Gary

    I recommend numpy for DSP in python. Lots of tools and goes fast.

Leave a Reply


*