#!/usr/bin/python
#
# Created by dancejic.com (GD and ND).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details: .
#
# TITLE: rotaryPhoneStateMachine.py
# Description: Implementing rotary phone with Raspberry Pi and SIM868 HAT
# Usage: python rotaryPhoneStateMachine.py
#
# $Revision: \main\1 $
#
# Revision notes:
# rev 1 Adapted from initial implementation and prepared for Instructables
#
# CONTRIBUTORS:
# GD Goran Dancejic
# ND Nikola Dancejic
#******************************************************************************
import serial
import time
from gpiozero import LED, Button
import os
import re
#******************************************************************************
# common objects/constants/functions
#
phone_state = "IDLE"
handset_up = Button(11)
dial_turned = Button(26)
dial_pulse = Button(19)
ring_on = LED(25,active_high=False,initial_value=False)
SIM868_run = LED(10,active_high=False,initial_value=False)
no_of_pulses = 0
dial_turn_completed = False
def count_pulses():
global no_of_pulses
no_of_pulses += 1
if (no_of_pulses > 12):
print "error reading pulses"
no_of_pulses = 0
def single_dial_turn_completed():
global no_of_pulses
global dial_turn_completed
if ((no_of_pulses > 0) and (no_of_pulses <= 12)):
dial_turn_completed = True
dial_pulse.when_pressed = count_pulses
dial_turned.when_pressed = single_dial_turn_completed
PHONE_TIMEOUT = 30.0
HALF_SEC = 0.5
TENT_SEC = 0.1
AT_TEST = "AT\r\n"
ACCEPT_CALL = "ATA\r\n"
HANG_UP_CALL = "ATH\r\n"
ATDbfr = "ATD"
PHONE_TO_DIAL = ""
# ***********************************************************************************
FORWARDING_NUMBER = "5551234567" # put here phone where you want your messages to be forwarded
# ***********************************************************************************
ATDaft = ";\r\n"
SHUTDOWN_SIM868 = "AT+CPOWD=1\r\n" # expected response is "NORMAL POWER OFF"
TXT_MODE = "AT+CMGF=1\r\n"
SMSbfr = "AT+CMGS=\""
SMSaft = "\"\r\n"
SMS_FULL_LIST = "AT+CMGL=\"ALL\"\r\n"
SMS_UNREAD_LIST = "AT+CMGL=\"REC UNREAD\"\r\n"
#SMS_SINGLE = "AT+CMGR=
CLEAR_SMS_LIST = "AT+CMGDA=\"DEL ALL\"\r\n"
ENABLE_CALL_ID = "AT+CLIP=1\r\n"
# if caller has ID, will receive following: '+CLIP: "",...'
CHECK_SIM = "AT+CPIN?\r\n"
# returns '+CPIN: READY' if card is OK
CHECK_NETWORK = "AT+CREG?\r\n"
# returns '+CREG: ...' if status OK, otherwise 'CME ERROR: '
CHECK_OWN_NUMBER = "AT+CNUM\r\n"
CHECK_CALL_STATUS = "AT+CPAS\r\n"
# returns '+CPAS: ', where 0=Ready, 3=Ring, 4=Call in progress
DIAL_TONE = "AT+STTONE=1,1,30000\r\n" # american dial tone = 20
TUUU_TONE = "AT+STTONE=1,2,30000\r\n" # also check #7 or #18
RING_TONE = "AT+STTONE=1,8,30000\r\n" # 2 sec on and then 2-3 sec pause?
STOP_TONE = "AT+STTONE=0\r\n"
#******************************************************************************
# SIM868 initialization and rutines
# make sure that /dev/ttyS0 is right for your setup
sim868 = serial.Serial("/dev/ttyS0",115200)
def readSIM868():
global sim868
data = ""
while sim868.inWaiting() > 0:
data += sim868.read(sim868.inWaiting())
time.sleep(0.0001)
return data
def initSIM868():
global sim868
sim868.flushInput()
sim868.write(AT_TEST)
time.sleep(TENT_SEC)
response = readSIM868()
if not ("OK" in response):
SIM868_run.on()
time.sleep(1.5)
SIM868_run.off()
time.sleep(30.0)
else:
print "SIM868 already active, serial port opened"
simOK = False
netOK = False
response = ""
sim868.flushInput()
sim868.write(CHECK_SIM)
time.sleep(TENT_SEC)
response = readSIM868()
if "READY" in response:
simOK = True
print "SIM OK"
else:
print response
sim868.write(CHECK_NETWORK)
time.sleep(TENT_SEC)
response = readSIM868()
if "+CREG" in response:
netOK = True
print "NET OK"
else:
print response
sim868.write(TXT_MODE)
time.sleep(TENT_SEC)
sim868.flushInput()
return (simOK and netOK)
def send_text(text, number):
global sim868
#send SMS to number:
sim868.write(SMSbfr+number+SMSaft)
time.sleep(HALF_SEC)
# no need to read response, just send text
sim868.write(text.encode())
sim868.write("\x1a\r\n")# 0x1a : send 0x1b : Cancel send
time.sleep(HALF_SEC)
response = readSIM868()
print response
return
def receive_and_forward_text(number):
global sim868
#check SMS list
sim868.write(SMS_FULL_LIST) #(SMS_UNREAD_LIST)
time.sleep(HALF_SEC)
time.sleep(HALF_SEC)
response = readSIM868()
# if there is UNREAD message, forward it to number
if "UNREAD" in response:
#print response
unread_sms = re.compile(r'UNREAD","(.*)","".*\n(.*)\n(.*)\n')
for match in unread_sms.finditer(response):
phone = match.group(1)
message1 = match.group(2)
message2 = match.group(3)
print phone + " : " + message1 + message2
send_text("incomming SMS from " + phone + " : " + message1 + message2, number)
return
# TBD: clear list
def clear_SMS_list():
global sim868
#send SMS to number:
sim868.write(CLEAR_SMS_LIST)
time.sleep(HALF_SEC)
# no need to read response, but just clear the buffer
response = readSIM868()
print "clear SMS list: " + response
return
#******************************************************************************
# state machine functions
#
def idle():
global no_of_pulses
global dial_turn_completed
global PHONE_TO_DIAL
timecnt = 0.0
# by default, if no action taken, we will stay in this state
next_state = "IDLE"
while (next_state == "IDLE"):
if handset_up.is_pressed:
next_state = "DIAL"
# if incoming call next_state = "RING"
phone_status = readSIM868()
if ("RING" in phone_status):
next_state = "RING"
print phone_status
phone_status_data = phone_status.split(" ")[1].split(",")
PHONE_TO_DIAL = phone_status_data[0].strip('\"')
print PHONE_TO_DIAL
# every 60 sec, check for incomming SMS
# TBD: this can be done in different way - see +CMTI: "SM",5 ...
if (timecnt > 2*PHONE_TIMEOUT):
receive_and_forward_text(FORWARDING_NUMBER)
clear_SMS_list()
timecnt = 0.0
# if dialed "#" while headset is down next_state = EXIT
if ((dial_turn_completed) and (no_of_pulses == 12)):
next_state = "EXIT"
no_of_pulses = 0
# TBD: other processing, maybe use dial "*" to send status msg etc.
if ((dial_turn_completed) and (no_of_pulses == 11)):
# add here status message or something similar
no_of_pulses = 0
# idle time, no need to be super fast
time.sleep(HALF_SEC)
timecnt = timecnt + HALF_SEC
print next_state
return next_state
def dial():
global no_of_pulses
global dial_turn_completed
global PHONE_TO_DIAL
global sim868
# start clean before starting dial
no_of_pulses = 0
dial_turn_completed = False
PHONE_TO_DIAL = ""
# by default, if no action taken, next state will be
next_state = "TUUUU"
dial_completed = False
timecnt = 0.0
sim868.write(DIAL_TONE)
# if 30+ sec and dial not completed exit state
while ((timecnt < PHONE_TIMEOUT) and (not dial_completed)):
if not handset_up.is_pressed:
next_state = "IDLE"
dial_completed = True
# check if number entered
if (dial_turn_completed):
sim868.write(STOP_TONE)
# read number, append to the rest, check if whole number completed
if (no_of_pulses < 10):
PHONE_TO_DIAL += str(no_of_pulses)
elif (no_of_pulses == 10):
PHONE_TO_DIAL += str(0)
# TBD: process "#/R" as Redial
# TBD: process "*" as speed dial, in which case next dial would be associated with dial list number
else:
# disregard for now
print("special char: ", no_of_pulses)
# clear dial_turn_completed and wait for next number
no_of_pulses = 0
dial_turn_completed = False
# if all entered -> dial completed, next_state = "CALL_OUT"
if (len(PHONE_TO_DIAL) == 10):
# print "phone to dial: " + PHONE_TO_DIAL
next_state = "CALL_OUT"
dial_completed = True
timecnt = timecnt + TENT_SEC
time.sleep(TENT_SEC)
sim868.write(STOP_TONE)
print next_state
return next_state
def call_out():
global PHONE_TO_DIAL
global sim868
# stay here until headset down or call completed
next_state = "CALL_OUT"
call_completed = False
call_accepted = False
dial_completed = False
timecnt = 0.0
print "calling number: " + PHONE_TO_DIAL
while (not call_completed):
# add here function for dialing number - do it only once
if (not dial_completed):
sim868.write(ATDbfr+PHONE_TO_DIAL+ATDaft)
time.sleep(TENT_SEC)
response = readSIM868()
dial_completed = True
# if no answer in 30+ sec or
# call not possible (SIM868 error) -> next_state = "TUUUU"
if (not call_accepted) and (timecnt > PHONE_TIMEOUT):
next_state = "TUUUU"
#call_completed = True
# TBD: figure out how to determine that other side did not accept the call
# if headset down next_state = "IDLE"
if not handset_up.is_pressed:
next_state = "IDLE"
call_completed = True
# prepare for next iteration
timecnt = timecnt + HALF_SEC
time.sleep(HALF_SEC)
# in the end, make sure call is closed
sim868.write(HANG_UP_CALL)
time.sleep(TENT_SEC)
response = readSIM868()
print next_state
return next_state
def ring():
global sim868
# by default, if no action taken, next state will be
next_state = "IDLE"
call_accepted = False
timecnt = 0.0
# ring tone
#sim868.write(RING_TONE)
ring_on.on()
time.sleep(HALF_SEC)
ring_on.off()
#print "incomming call from: " + PHONE_TO_DIAL
send_text("incomming call from: " + PHONE_TO_DIAL, FORWARDING_NUMBER)
# if 30+ sec and headset not lifted exit state
# TBD: do we need longer ringing (60 sec), also verify mp3 duration
while ((timecnt < PHONE_TIMEOUT) and (not call_accepted)):
# if headset lifted next_state = "CALL_IN"
if handset_up.is_pressed:
next_state = "CALL_IN"
call_accepted = True
# prepare for next iteration
timecnt = timecnt + HALF_SEC
time.sleep(HALF_SEC)
# close state
#sim868.write(STOP_TONE)
ring_on.on()
time.sleep(HALF_SEC)
ring_on.off()
print next_state
return next_state
def call_in():
global sim868
# by default, if no action taken, next state will be
next_state = "CALL_IN"
call_completed = False
# accept call here - ATA command
sim868.write(ACCEPT_CALL)
time.sleep(TENT_SEC)
response = readSIM868()
# stay here until headset down or call completed
while (not call_completed):
# if call completed by other side
# but handset not placed down -> next_state = "TUUUU"
# TBD: determine how to recognize this
# if headset down next_state = "IDLE"
if not handset_up.is_pressed:
next_state = "IDLE"
call_completed = True
time.sleep(TENT_SEC)
# in the end, make sure call is closed
sim868.write(HANG_UP_CALL)
time.sleep(TENT_SEC)
response = readSIM868()
print next_state
return next_state
def tuuuu():
global sim868
# by default, if no action taken, next state will still be
next_state = "TUUUU"
# produce mono-tone to signal state
sim868.write(TUUU_TONE)
while (next_state == "TUUUU"):
# the only transition in this state is to IDLE, if headset down
if not handset_up.is_pressed:
next_state = "IDLE"
time.sleep(HALF_SEC)
# stop playing tuuuu
sim868.write(STOP_TONE)
print next_state
return next_state
def shutdown_phone():
global sim868
next_state = "SHUT_DOWN" # this will result in main loop exiting and doing default() next
print "shuting down SIM868"
sim868.write(SHUTDOWN_SIM868)
time.sleep(HALF_SEC)
response = readSIM868()
print(response)
sim868.close()
return next_state
def default():
print "shuting down Raspberry Pi"
# TBD: uncomment below to allow shuting down R Pi
#os.system("sudo shutdown now")
time.sleep(HALF_SEC)
return
# phone states and associated functions
phone_state_machine = {
"IDLE" : idle,
"DIAL" : dial,
"CALL_OUT" : call_out,
"RING" : ring,
"CALL_IN" : call_in,
"TUUUU" : tuuuu,
"EXIT" : shutdown_phone
}
#******************************************************************************
# main
#
try:
if (initSIM868() == True):
while (phone_state in phone_state_machine):
#evaluate phone state to determine next action
phone_state = phone_state_machine[phone_state]()
time.sleep(TENT_SEC)
# at the end, gracefully shutdown all
default()
except KeyboardInterrupt:
# stop execution for +C
print "stop phone app without shuting down Pi"
shutdown_phone()