#!/usr/bin/python # # Created by dancejic.com (GD and ND). # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details: . # # TITLE: rotaryPhoneStateMachine.py # Description: Implementing rotary phone with Raspberry Pi and SIM868 HAT # Usage: python rotaryPhoneStateMachine.py # # $Revision: \main\1 $ # # Revision notes: # rev 1 Adapted from initial implementation and prepared for Instructables # # CONTRIBUTORS: # GD Goran Dancejic # ND Nikola Dancejic #****************************************************************************** import serial import time from gpiozero import LED, Button import os import re #****************************************************************************** # common objects/constants/functions # phone_state = "IDLE" handset_up = Button(11) dial_turned = Button(26) dial_pulse = Button(19) ring_on = LED(25,active_high=False,initial_value=False) SIM868_run = LED(10,active_high=False,initial_value=False) no_of_pulses = 0 dial_turn_completed = False def count_pulses(): global no_of_pulses no_of_pulses += 1 if (no_of_pulses > 12): print "error reading pulses" no_of_pulses = 0 def single_dial_turn_completed(): global no_of_pulses global dial_turn_completed if ((no_of_pulses > 0) and (no_of_pulses <= 12)): dial_turn_completed = True dial_pulse.when_pressed = count_pulses dial_turned.when_pressed = single_dial_turn_completed PHONE_TIMEOUT = 30.0 HALF_SEC = 0.5 TENT_SEC = 0.1 AT_TEST = "AT\r\n" ACCEPT_CALL = "ATA\r\n" HANG_UP_CALL = "ATH\r\n" ATDbfr = "ATD" PHONE_TO_DIAL = "" # *********************************************************************************** FORWARDING_NUMBER = "5551234567" # put here phone where you want your messages to be forwarded # *********************************************************************************** ATDaft = ";\r\n" SHUTDOWN_SIM868 = "AT+CPOWD=1\r\n" # expected response is "NORMAL POWER OFF" TXT_MODE = "AT+CMGF=1\r\n" SMSbfr = "AT+CMGS=\"" SMSaft = "\"\r\n" SMS_FULL_LIST = "AT+CMGL=\"ALL\"\r\n" SMS_UNREAD_LIST = "AT+CMGL=\"REC UNREAD\"\r\n" #SMS_SINGLE = "AT+CMGR= CLEAR_SMS_LIST = "AT+CMGDA=\"DEL ALL\"\r\n" ENABLE_CALL_ID = "AT+CLIP=1\r\n" # if caller has ID, will receive following: '+CLIP: "",...' CHECK_SIM = "AT+CPIN?\r\n" # returns '+CPIN: READY' if card is OK CHECK_NETWORK = "AT+CREG?\r\n" # returns '+CREG: ...' if status OK, otherwise 'CME ERROR: ' CHECK_OWN_NUMBER = "AT+CNUM\r\n" CHECK_CALL_STATUS = "AT+CPAS\r\n" # returns '+CPAS: ', where 0=Ready, 3=Ring, 4=Call in progress DIAL_TONE = "AT+STTONE=1,1,30000\r\n" # american dial tone = 20 TUUU_TONE = "AT+STTONE=1,2,30000\r\n" # also check #7 or #18 RING_TONE = "AT+STTONE=1,8,30000\r\n" # 2 sec on and then 2-3 sec pause? STOP_TONE = "AT+STTONE=0\r\n" #****************************************************************************** # SIM868 initialization and rutines # make sure that /dev/ttyS0 is right for your setup sim868 = serial.Serial("/dev/ttyS0",115200) def readSIM868(): global sim868 data = "" while sim868.inWaiting() > 0: data += sim868.read(sim868.inWaiting()) time.sleep(0.0001) return data def initSIM868(): global sim868 sim868.flushInput() sim868.write(AT_TEST) time.sleep(TENT_SEC) response = readSIM868() if not ("OK" in response): SIM868_run.on() time.sleep(1.5) SIM868_run.off() time.sleep(30.0) else: print "SIM868 already active, serial port opened" simOK = False netOK = False response = "" sim868.flushInput() sim868.write(CHECK_SIM) time.sleep(TENT_SEC) response = readSIM868() if "READY" in response: simOK = True print "SIM OK" else: print response sim868.write(CHECK_NETWORK) time.sleep(TENT_SEC) response = readSIM868() if "+CREG" in response: netOK = True print "NET OK" else: print response sim868.write(TXT_MODE) time.sleep(TENT_SEC) sim868.flushInput() return (simOK and netOK) def send_text(text, number): global sim868 #send SMS to number: sim868.write(SMSbfr+number+SMSaft) time.sleep(HALF_SEC) # no need to read response, just send text sim868.write(text.encode()) sim868.write("\x1a\r\n")# 0x1a : send 0x1b : Cancel send time.sleep(HALF_SEC) response = readSIM868() print response return def receive_and_forward_text(number): global sim868 #check SMS list sim868.write(SMS_FULL_LIST) #(SMS_UNREAD_LIST) time.sleep(HALF_SEC) time.sleep(HALF_SEC) response = readSIM868() # if there is UNREAD message, forward it to number if "UNREAD" in response: #print response unread_sms = re.compile(r'UNREAD","(.*)","".*\n(.*)\n(.*)\n') for match in unread_sms.finditer(response): phone = match.group(1) message1 = match.group(2) message2 = match.group(3) print phone + " : " + message1 + message2 send_text("incomming SMS from " + phone + " : " + message1 + message2, number) return # TBD: clear list def clear_SMS_list(): global sim868 #send SMS to number: sim868.write(CLEAR_SMS_LIST) time.sleep(HALF_SEC) # no need to read response, but just clear the buffer response = readSIM868() print "clear SMS list: " + response return #****************************************************************************** # state machine functions # def idle(): global no_of_pulses global dial_turn_completed global PHONE_TO_DIAL timecnt = 0.0 # by default, if no action taken, we will stay in this state next_state = "IDLE" while (next_state == "IDLE"): if handset_up.is_pressed: next_state = "DIAL" # if incoming call next_state = "RING" phone_status = readSIM868() if ("RING" in phone_status): next_state = "RING" print phone_status phone_status_data = phone_status.split(" ")[1].split(",") PHONE_TO_DIAL = phone_status_data[0].strip('\"') print PHONE_TO_DIAL # every 60 sec, check for incomming SMS # TBD: this can be done in different way - see +CMTI: "SM",5 ... if (timecnt > 2*PHONE_TIMEOUT): receive_and_forward_text(FORWARDING_NUMBER) clear_SMS_list() timecnt = 0.0 # if dialed "#" while headset is down next_state = EXIT if ((dial_turn_completed) and (no_of_pulses == 12)): next_state = "EXIT" no_of_pulses = 0 # TBD: other processing, maybe use dial "*" to send status msg etc. if ((dial_turn_completed) and (no_of_pulses == 11)): # add here status message or something similar no_of_pulses = 0 # idle time, no need to be super fast time.sleep(HALF_SEC) timecnt = timecnt + HALF_SEC print next_state return next_state def dial(): global no_of_pulses global dial_turn_completed global PHONE_TO_DIAL global sim868 # start clean before starting dial no_of_pulses = 0 dial_turn_completed = False PHONE_TO_DIAL = "" # by default, if no action taken, next state will be next_state = "TUUUU" dial_completed = False timecnt = 0.0 sim868.write(DIAL_TONE) # if 30+ sec and dial not completed exit state while ((timecnt < PHONE_TIMEOUT) and (not dial_completed)): if not handset_up.is_pressed: next_state = "IDLE" dial_completed = True # check if number entered if (dial_turn_completed): sim868.write(STOP_TONE) # read number, append to the rest, check if whole number completed if (no_of_pulses < 10): PHONE_TO_DIAL += str(no_of_pulses) elif (no_of_pulses == 10): PHONE_TO_DIAL += str(0) # TBD: process "#/R" as Redial # TBD: process "*" as speed dial, in which case next dial would be associated with dial list number else: # disregard for now print("special char: ", no_of_pulses) # clear dial_turn_completed and wait for next number no_of_pulses = 0 dial_turn_completed = False # if all entered -> dial completed, next_state = "CALL_OUT" if (len(PHONE_TO_DIAL) == 10): # print "phone to dial: " + PHONE_TO_DIAL next_state = "CALL_OUT" dial_completed = True timecnt = timecnt + TENT_SEC time.sleep(TENT_SEC) sim868.write(STOP_TONE) print next_state return next_state def call_out(): global PHONE_TO_DIAL global sim868 # stay here until headset down or call completed next_state = "CALL_OUT" call_completed = False call_accepted = False dial_completed = False timecnt = 0.0 print "calling number: " + PHONE_TO_DIAL while (not call_completed): # add here function for dialing number - do it only once if (not dial_completed): sim868.write(ATDbfr+PHONE_TO_DIAL+ATDaft) time.sleep(TENT_SEC) response = readSIM868() dial_completed = True # if no answer in 30+ sec or # call not possible (SIM868 error) -> next_state = "TUUUU" if (not call_accepted) and (timecnt > PHONE_TIMEOUT): next_state = "TUUUU" #call_completed = True # TBD: figure out how to determine that other side did not accept the call # if headset down next_state = "IDLE" if not handset_up.is_pressed: next_state = "IDLE" call_completed = True # prepare for next iteration timecnt = timecnt + HALF_SEC time.sleep(HALF_SEC) # in the end, make sure call is closed sim868.write(HANG_UP_CALL) time.sleep(TENT_SEC) response = readSIM868() print next_state return next_state def ring(): global sim868 # by default, if no action taken, next state will be next_state = "IDLE" call_accepted = False timecnt = 0.0 # ring tone #sim868.write(RING_TONE) ring_on.on() time.sleep(HALF_SEC) ring_on.off() #print "incomming call from: " + PHONE_TO_DIAL send_text("incomming call from: " + PHONE_TO_DIAL, FORWARDING_NUMBER) # if 30+ sec and headset not lifted exit state # TBD: do we need longer ringing (60 sec), also verify mp3 duration while ((timecnt < PHONE_TIMEOUT) and (not call_accepted)): # if headset lifted next_state = "CALL_IN" if handset_up.is_pressed: next_state = "CALL_IN" call_accepted = True # prepare for next iteration timecnt = timecnt + HALF_SEC time.sleep(HALF_SEC) # close state #sim868.write(STOP_TONE) ring_on.on() time.sleep(HALF_SEC) ring_on.off() print next_state return next_state def call_in(): global sim868 # by default, if no action taken, next state will be next_state = "CALL_IN" call_completed = False # accept call here - ATA command sim868.write(ACCEPT_CALL) time.sleep(TENT_SEC) response = readSIM868() # stay here until headset down or call completed while (not call_completed): # if call completed by other side # but handset not placed down -> next_state = "TUUUU" # TBD: determine how to recognize this # if headset down next_state = "IDLE" if not handset_up.is_pressed: next_state = "IDLE" call_completed = True time.sleep(TENT_SEC) # in the end, make sure call is closed sim868.write(HANG_UP_CALL) time.sleep(TENT_SEC) response = readSIM868() print next_state return next_state def tuuuu(): global sim868 # by default, if no action taken, next state will still be next_state = "TUUUU" # produce mono-tone to signal state sim868.write(TUUU_TONE) while (next_state == "TUUUU"): # the only transition in this state is to IDLE, if headset down if not handset_up.is_pressed: next_state = "IDLE" time.sleep(HALF_SEC) # stop playing tuuuu sim868.write(STOP_TONE) print next_state return next_state def shutdown_phone(): global sim868 next_state = "SHUT_DOWN" # this will result in main loop exiting and doing default() next print "shuting down SIM868" sim868.write(SHUTDOWN_SIM868) time.sleep(HALF_SEC) response = readSIM868() print(response) sim868.close() return next_state def default(): print "shuting down Raspberry Pi" # TBD: uncomment below to allow shuting down R Pi #os.system("sudo shutdown now") time.sleep(HALF_SEC) return # phone states and associated functions phone_state_machine = { "IDLE" : idle, "DIAL" : dial, "CALL_OUT" : call_out, "RING" : ring, "CALL_IN" : call_in, "TUUUU" : tuuuu, "EXIT" : shutdown_phone } #****************************************************************************** # main # try: if (initSIM868() == True): while (phone_state in phone_state_machine): #evaluate phone state to determine next action phone_state = phone_state_machine[phone_state]() time.sleep(TENT_SEC) # at the end, gracefully shutdown all default() except KeyboardInterrupt: # stop execution for +C print "stop phone app without shuting down Pi" shutdown_phone()