General structure taken from Jon's A4 Notebook.

In [1]:
import matplotlib.pyplot as plt # needed for plotting
import numpy as np # numpy is primary library for numeric array (and matrix) handling
import scipy as sp
from scipy import stats, signal
import random
from sklearn import svm # needed for svm
from sklearn.metrics import confusion_matrix
import itertools
from scipy.io import wavfile
import librosa

from sys import byteorder
from array import array
from struct import pack
import time

import pyaudio
import wave

# for messaging
from enum import Enum
import serial 

# Audio Capture

## Capture for Training

## Live Capture for Testing

In [2]:
## audio capture code from : https://stackoverflow.com/questions/892199/detect-record-audio-in-python
THRESHOLD = 500
CHUNK_SIZE = 1024*4
FORMAT = pyaudio.paInt16
RATE = 44100

silent_cycles_count = 30
min_max_begin_segment_threshold = 90 
min_max_end_segment_threshold = 25 

# RMS calc for threshold gate.
# https://stackoverflow.com/questions/18406570/python-record-audio-on-detected-sound
def rms(frame):
    count = len(frame)/swidth
    format = "%dh"%(count)
    # short is 16 bit int
    shorts = struct.unpack( format, frame )
    sum_squares = 0.0
    for sample in shorts:
        n = sample * SHORT_NORMALIZE
        sum_squares += n*n
    # compute the rms 
    rms = math.pow(sum_squares/count,0.5);
    return rms * 1000

def is_silent(snd_data):
    "Returns 'True' if below the 'silent' threshold"
    return rms(snd_data) < THRESHOLD

 
def crosses_threshold(snd_data, threshold):
    min_max_diff = abs(np.max(snd_data) - np.min(snd_data))
    print("diff: ", min_max_diff)
    return min_max_diff >= threshold

def is_cross_start_threshold(snd_data):
    min_max_begin_segment_threshold = 90 
    min_max_diff = abs(np.max(snd_data) - np.min(snd_data))
    #print("start diff: ",min_max_diff)
    return min_max_diff >= min_max_begin_segment_threshold
    
            
    
def is_cross_stop_threshold(snd_data):
    min_max_continue_segment_threshold = 25 #lower threshold for continuing event
    min_max_diff = abs(np.max(snd_data) - np.min(snd_data))
    #print("end diff: ",min_max_diff)
    return min_max_diff <= min_max_end_segment_threshold
    
# open port and listen
# when over threshold, begin to store
# when stops, segment and send
def record(trained_model, messenger):
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=1, rate=RATE,
        input=True, output=True,
        frames_per_buffer=CHUNK_SIZE)

    num_silent = 0
    snd_started = False
    r = array('h')
    # current_threshold = min_max_begin_segment_threshold
    
    print("listening")
    
    count = -1
    while 1:
        count += 1
        
        # little endian, signed short
        frame = stream.read(CHUNK_SIZE)
        snd_data = array('h', frame)
        
        if byteorder == 'big':
            snd_data.byteswap()
            
        r.extend(snd_data)
        
        activity_check = rms(frame)
        print("activity: {}".format(activity_check))
        silent = is_silent(frame)
        
        #silent = True
        #snd_started = is_above_threshold(snd_data)
        
        if count <= 10:
            continue
        
        #silent = is_silent(snd_data)
        if silent and not snd_started:
            continue
            
        #r.extend(snd_data)
        
        if silent and snd_started:
            num_silent += 1
        elif not silent and snd_started:
            num_silent = 0
        elif not silent and not snd_started:
            print("recording")
            snd_started = True
            startNum = count
            
        
        # captured a sample
        if snd_started and num_silent > silent_cycles_count:
            
            # captured, send to processing
            if len(r[(startNum*CHUNK_SIZE):]) > 44100:
                print("stop: silent too long")
                trimmed_r = r[((startNum-10)*CHUNK_SIZE):(-20*CHUNK_SIZE)]
                sample = LiveSound(trimmed_r, RATE)
                classification = trained_model.classifySample(sample)[0]
                print(classification)
                messenger.sendMessage(classification)
                
            else:
                print("too short: ",len(r))
                
            count = -1
            num_silent = 0
            r = array('h')
            snd_started = False
           
            print("listening")

    sample_width = p.get_sample_size(FORMAT)
    stream.stop_stream()
    stream.close()
    print("close")
    p.terminate()

    return sample_width, r[((startNum-10)*CHUNK_SIZE):(-20*CHUNK_SIZE)]


# Data Processing and Classification

In [3]:
# Add preprocessing functions here if needed
def preprocess(rawSignal):
    
    # Add preprocessing functions here if needed
    return rawSignal

class LiveSound:
    def __init__(self, data, sampleRate):
        self.sound = np.array(data)
        self.sound_float = np.array(data).astype(float)
        self.sound_p = preprocess(data)
        self.sample_rate = sampleRate
        
        self.soundClassification = None


class SoundTrial:
    def __init__(self, soundName, trialNum, filenameWithPath):
        self.soundName = soundName
        self.trialNum = trialNum
        self.filenameWithPath = filenameWithPath
        self.filename = os.path.basename(filenameWithPath)
        
        sampleRate, data = wavfile.read(filenameWithPath)
    
        self.sound = data
        self.sound_float = data.astype(float)
        self.sound_p = preprocess(data)
        self.sample_rate = sampleRate
    
    def __str__(self):
         return "'{}' : Trial {} from {}".format(self.soundName, self.trialNum, self.filename)
      
    
    
class SoundSet:
    def __init__(self, sound_sample_path, map_sounds_to_trials):
        self.path = sound_sample_path
        self.map_sounds_to_trials = map_sounds_to_trials 
        
    # returns the base path
    def get_base_path(self):
        return os.path.basename(os.path.normpath(self.path))
    
    # returns the number of sounds
    def get_num_sounds(self):
        return len(self.map_sounds_to_trials)
    
    # returns the total number of trials
    def get_total_num_of_trials(self):
        numTrials = 0 
        for soundName, trialSet in self.map_sounds_to_trials.items():
            numTrials = numTrials + len(trialSet)
        return numTrials
    
    # returns a sorted list of gesture names
    def get_sound_names_sorted(self):
        return sorted(self.map_sounds_to_trials.keys())
    

    # THESE HAVEN'T BEEN CHANGED YET ****************************
    # returns the longest trial (based on num rows recorded and not clock time)
    def get_longest_trial(self):
        longest_trial_length = -1
        longest_trial = None
        for gesture_name, trial_list in self.map_gestures_to_trials.items():
            for trial in trial_list:
                if longest_trial_length < len(trial.accel.x):
                    longest_trial_length = len(trial.accel.x)
                    longest_trial = trial
        return longest_trial
    

    
    # returns trials for a gesture name
    def get_trials_for_gesture(self, gesture_name):
        return self.map_gestures_to_trials[gesture_name]
    
    # creates an aggregate signal based on *all* trials for this gesture
    # TODO: in future could add in an argument, which takes a list of trial nums
    # to use to produce aggregate signal
    def create_aggregate_signal(self, gesture_name, signal_var_name):
        trials = self.get_trials_for_gesture(gesture_name)
        aggregate_signal = None
        trial_signals = []
        trial_signals_original = []
        first_trial = None
        first_trial_signal = None
        
        max_length = -1
        for trial in trials:
            trial_signal = getattr(trial.accel, signal_var_name)
            if max_length < len(trial_signal):
                max_length = len(trial_signal)
            
        for i in range(len(trials)):
            if i == 0:
                first_trial = trials[i]
                trial_signal = getattr(first_trial.accel, signal_var_name)
                trial_signal_mod = np.copy(trial_signal)

                trial_signals.append(trial_signal_mod)
                trial_signals_original.append(trial_signal)
                
                array_length_diff = max_length - len(trial_signal_mod)
                trial_signal_mod = np.pad(trial_signal_mod, (0, array_length_diff), 'mean')  

                aggregate_signal = trial_signal_mod
                first_trial_signal = trial_signal_mod
            else:

                cur_trial = trials[i]
                cur_trial_signal = getattr(trial.accel, signal_var_name) 
                trial_signals_original.append(cur_trial_signal)
                
                array_length_diff = max_length - len(cur_trial_signal)
                cur_trial_signal_mod = np.pad(cur_trial_signal, (0, array_length_diff), 'mean') 

                cur_trial_signal_mod = get_aligned_signal_cutoff_and_pad(cur_trial_signal_mod, first_trial_signal)
                trial_signals.append(cur_trial_signal_mod)
                aggregate_signal += cur_trial_signal_mod
        
        mean_signal = aggregate_signal / len(trial_signals) 
        return mean_signal

    # Returns the minimum number of trials across all gestures (just in case we accidentally recorded a 
    # different number. We should have the same number of trials across all gestures)
    def get_min_num_of_trials(self):
        minNumTrials = -1 
        for gestureName, trialSet in self.map_gestures_to_trials.items():
            if minNumTrials == -1 or minNumTrials > len(trialSet):
                minNumTrials = len(trialSet)
        return minNumTrials
    
    # get random gesture name
    def get_random_gesture_name(self):
        gesture_names = list(self.map_gestures_to_trials.keys())
        rand_gesture_name = gesture_names[random.randint(0, len(gesture_names) - 1)]
        return rand_gesture_name
    
    # get random trial
    def get_random_trial(self):
        rand_gesture_name = self.get_random_gesture_name()
        print("rand_gesture_name", rand_gesture_name)
        trials_for_gesture = self.map_gestures_to_trials[rand_gesture_name]
        return trials_for_gesture[random.randint(0, len(trials_for_gesture) - 1)]
    
    
    
    # prettify the str()
    def __str__(self):
         return "'{}' : {} sounds and {} total trials".format(self.path, self.get_num_sounds(), self.get_total_num_of_trials())
        

In [4]:
from os import listdir
import ntpath
import os

# From: https://stackoverflow.com/questions/800197/how-to-get-all-of-the-immediate-subdirectories-in-python
def get_immediate_subdirectories(a_dir):
    return [name for name in os.listdir(a_dir)
            if os.path.isdir(os.path.join(a_dir, name))]

# Currently excludes any filenames with 'fulldatastream' in the title
def find_wav_filenames( path_to_dir, suffix=".wav" ):
    filenames = listdir(path_to_dir)
    return [ filename for filename in filenames if filename.endswith( suffix )]

def parse_and_create_sound_trials( path_to_dir ):
    wavFilenames = find_wav_filenames(path_to_dir)
    
    print("Found {} wav files in {}".format(len(wavFilenames), path_to_dir))
    
    mapSoundNameToTrialList = dict()
    mapSoundNameToMapSampleNum = dict()
    for wavFilename in wavFilenames:
        
        # parse filename into meaningful parts
        filenameNoExt = os.path.splitext(wavFilename)[0];
        filenameParts = filenameNoExt.split("_")
            
        soundName = filenameParts[0]
        sampleNum = filenameParts[1]
        fileName = "filename"
        #print("soundName={} sampleNum={}".format(soundName, sampleNum))
        
        
        if soundName not in mapSoundNameToMapSampleNum:
            mapSoundNameToMapSampleNum[soundName] = dict()
            
        if sampleNum not in mapSoundNameToMapSampleNum[soundName]:
            mapSoundNameToMapSampleNum[soundName][sampleNum] = dict()
            
        mapSoundNameToMapSampleNum[soundName][sampleNum][fileName] = wavFilename

        #print(mapSoundNameToMapSampleNum)
    
    print("Found {} sounds".format(len(mapSoundNameToMapSampleNum)))
   

    # Now we need to loop through the data and sort each sound set by timems values 
    # (so that we have trial 1, 2, 3, etc. in order)
    for soundName, mapSampleNumToFile in mapSoundNameToMapSampleNum.items():
        soundTrialNum = 0
        mapSoundNameToTrialList[soundName] = list()
        for sampleNum in sorted(mapSampleNumToFile.keys()):
            mapSampleToFile = mapSampleNumToFile[sampleNum]
            
            filenameWithPath = os.path.join(path_to_dir, mapSampleToFile["filename"])
            soundTrial = SoundTrial(soundName, soundTrialNum, filenameWithPath)
            mapSoundNameToTrialList[soundName].append(soundTrial)
            
            soundTrialNum = soundTrialNum + 1
        
        print("Found {} trials for '{}'".format(len(mapSoundNameToTrialList[soundName]), soundName))

    return mapSoundNameToTrialList


In [11]:
# Load the data
class MapSoundSets:
    def __init__(self,rootSoundSamplePath="./SoundSamples", targetDirWord="StevenTreshold"):
        #sets map_sound_sets
        self.load_data(rootSoundSamplePath, targetDirWord)
        
    
    def load_data(self, rootSoundSamplePath, targetDirWord):

        print(get_immediate_subdirectories(rootSoundSamplePath))
        sound_sample_paths = get_immediate_subdirectories(rootSoundSamplePath)

        self.map_sound_sets = dict()
        self.selected_sound_set = None

        for sound_sample_path in sound_sample_paths:
            path_to_sound_sample = os.path.join(rootSoundSamplePath, sound_sample_path)
            print("\nReading in:", path_to_sound_sample)
            map_sounds_to_trials = parse_and_create_sound_trials(path_to_sound_sample)
            sound_set = SoundSet(sound_sample_path, map_sounds_to_trials)
            self.map_sound_sets[sound_set.get_base_path()] = sound_set
            if targetDirWord in sound_sample_path:
                    self.selected_sound_set = sound_set

        if self.selected_sound_set is not None:
            print("\nThe selected sound set:", self.selected_sound_set)
        

    def get_sound_set_with_str(self, targetStr):
        for base_path, sound_set in self.map_sound_sets.items():
            if targetStr in base_path:
                #print("set: ",sound_set)
                return sound_set
        return None


In [6]:
#%%time

# This is the simplest possible SVM using only a few features but gives you a sense of the overall approach
# Some nice resources:
#  - A very simple classification example using scikit: 
#     https://dbaumgartel.wordpress.com/2014/03/10/a-scikit-learn-example-in-10-lines/
#  - A nice video overview of SVM: https://youtu.be/N1vOgolbjSc
#  - Official sci-kit learn: http://scikit-learn.org/stable/modules/svm.html

from sklearn import svm
from sklearn.preprocessing import StandardScaler
import itertools
from scipy.stats import kurtosis, skew

# Returns a feature vectof for the given trial
def extract_features(sample):
    
    s = sample.sound_float
    r = sample.sample_rate
    
    stft = np.abs(librosa.stft(s))

    mfcc =  np.mean(librosa.feature.mfcc(y=s, sr=r, n_mfcc=50).T,axis=0)
    
    chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=r).T,axis=0)
    
    # This make everything seem like "Toaster"
    #mel = np.mean(librosa.feature.melspectrogram(s, r).T,axis=0)
    
    contrast = np.mean(librosa.feature.spectral_contrast(S=stft, sr=r).T,axis=0)

    # This adds some accuracy but takes forever
    #tonnetz = np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(s), sr=r).T,axis=0)
    
    zero_x = np.mean(librosa.feature.zero_crossing_rate(s).T,axis=0)
    
    features = np.hstack([mfcc, chroma, contrast, zero_x])
    
    
    #print(features.shape)
    return(features)

class TrainedModel:

    
    def __init__(self, selectedSet, modelType = "svm", targetedString="Test"):
        self.selected_sound_set = selectedSet
        self.numSounds = self.selected_sound_set.get_num_sounds()
        self.numTrialsTotal = self.selected_sound_set.get_total_num_of_trials()


        self.trainingData = np.empty((0,70))
        self.classLabels = np.empty(0)

        # build training data for this set of folds
        #for trainingSample in self.selected_sound_set.map_sounds_to_trials:
        #    for trainingSoundName, trainingTrial in trainingSample.items():
        for trainingSoundName, trainingTrials in self.selected_sound_set.map_sounds_to_trials.items():
            for trial in trainingTrials:
                features = extract_features(trial)
                print("Got features from {} #{}...".format(trainingSoundName, trial.trialNum))
                self.trainingData = np.vstack([self.trainingData, features])
                self.classLabels = np.append(self.classLabels, trainingSoundName)

        print("\n\n\nFitting data...\n")

        self._trainModel(modelType)

    def _trainModel(self, modelType):
        if modelType == "svm":
            # Here, we train SVM, the 'rbf' kernal is default
            # if you use rbf, need to set gamma and C parameters
            # play around with different kernels, read about them, and try them. What happens?
            # see: 
            # - https://scikit-learn.org/stable/auto_examples/svm/plot_rbf_parameters.html#sphx-glr-auto-examples-svm-plot-rbf-parameters-py
            # - https://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html
            self.model = svm.SVC(kernel='linear', gamma=0.01) # kernel='rbf'
        else:
            self.model = svm.SVC()
            
        self.model.fit(self.trainingData, self.classLabels)
        
    def classifySample(self, liveSoundSample):
        features = extract_features(liveSoundSample)
        
        prediction = self.model.predict([features])
        return prediction
     


# Messaging Arduino

In [7]:
class Messenger():
   
    def __init__(self, port="/dev/cu.usbmodem1411"):
        
        self.arduinoData = serial.Serial(port,9600)
        
        self.sounds = {
            'GarbageDisposal':'g',
            'MicrowaveDoorClose':'m',
            'MicrowaveDoorOpen':'w',
            'MicrowaveEnding':'e',
            'Toaster' : 't',
            'FridgeDoorOpen':'f',
            'CoffeeGrinder':'c',
            'FridgeDoorClose':'r'
        }

    def sendMessage(self, classification):
        message_in_bytes = self.sounds[classification].encode('utf-8')
        self.arduinoData.write(message_in_bytes)
        


# Main

In [10]:
# main
map_sound_sets = MapSoundSets()
trained_model = TrainedModel(map_sound_sets.selected_sound_set, targetedString="MyKitchen")

#test = SoundTrial("coffee", 0, "./SoundSamples/6secSample/CoffeeGrinder_0_captured.wav")
#testLive = LiveSound(test.sound, test.sample_rate)
#print(type(testLive.sound))
messenger = Messenger("/dev/cu.usbmodem1421")
record(trained_model, messenger)
#print(trained_model.classifySample(testLive))

['6secSample', '6Test', 'Annie', 'ATest', 'Basic', 'MyKitchen', 'SMono', 'Steven', 'thresholdStop']

Reading in: ./Samples\6secSample
Found 49 wav files in ./Samples\6secSample
Found 8 sounds
Found 6 trials for 'CoffeeGrinder'
Found 6 trials for 'FridgeDoorClose'
Found 6 trials for 'FridgeDoorOpen'
Found 6 trials for 'GarbageDisposal'
Found 6 trials for 'MicrowaveDoorClose'
Found 6 trials for 'MicrowaveDoorOpen'
Found 6 trials for 'MicrowaveEnding'
Found 7 trials for 'Toaster'

Reading in: ./Samples\6Test
Found 48 wav files in ./Samples\6Test
Found 8 sounds
Found 6 trials for 'CoffeeGrinder'
Found 6 trials for 'FridgeDoorClose'
Found 6 trials for 'FridgeDoorOpen'
Found 6 trials for 'GarbageDisposal'
Found 6 trials for 'MicrowaveDoorClose'
Found 6 trials for 'MicrowaveDoorOpen'
Found 6 trials for 'MicrowaveEnding'
Found 6 trials for 'Toaster'

Reading in: ./Samples\Annie
Found 37 wav files in ./Samples\Annie
Found 6 sounds
Found 7 trials for 'CoffeeGrinder'
Found 6 trials for 'FridgeDoo

SerialException: could not open port '/dev/cu.usbmodem1421': FileNotFoundError(2, 'The system cannot find the path specified.', None, 3)