Integrate Banana Phone scripts

We now come to the conclusion of our Banana Phone build. The scripts are written in Python and will be stored in some new directories we will create within the 'banana' home directory. We'll then add the needed media files for Asterisk to play when a call is received, and finish out with setting up the database for remembering validated caller IDs and some test calls to make sure everything works.

Banana Phone directory structure


We have a few Python scripts to copy into our 'banana' user's home directory, so we will make a few more to better organize things. Let's move into the 'banana' user's home directory and create some more.

Login to your Asterisk server as 'banana', move into the'banana_phone' directory in 'banana' user's home directory and create the following:

cd ~/banana_phone
mkdir audio_clips
mkdir audio_clips/slns
mkdir scripts
mkdir auth_audio

Now move into the 'scripts' directory you just created.

cd scripts/

Copy Banana Phone scripts


There are eight python scripts we must copy into our directory so Banana Phone will do it's job. They require some database work with SQLAlchemy, so we'll install that now.

sudo pip install sqlalchemy

I'm gonna just list the python files for copying individually here. I won't go over what the scripts do in detail. That is left as an exercise for the reader.

database_maps.py

This defines our database tables within sqlalchemy for a basic call log and white list. 

nano database_maps.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import datetime
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

# define the whitelist table
class WhiteList(Base):
    __tablename__ = 'whitelist'

    id = Column(Integer, primary_key=True)
    number = Column(Integer)
    timestamp = Column(DateTime, default=datetime.datetime.utcnow)

    def __repr__(self):
        return "<Whitelist CID: %s>" % self.number

class CallLog(Base):
    __tablename__ = 'log'

    id = Column(Integer, primary_key=True)
    number = Column(String)
    name = Column(String)
    timestamp = Column(DateTime, default=datetime.datetime.utcnow)
   
    def __repr__(self):
        return "<Call Log entry: %s - %s - %s>" % (self.name, self.number, self.timestamp)

database_helpers.py

Contains a helper function for creating a session in sqlalchemy for needed database operations.

nano database_helpers.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def create_session():
    engine = create_engine('sqlite:////home/banana/banana_phone/banana_phone.db')
    Session = sessionmaker(bind=engine)
    session = Session()
    session._model_changes = {}
    return session  
  

setup_database.py

We will run this script to create up the sqlite database needed for storing the CallLog and WhiteList. Don't run this now.

nano setup_database.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import datetime
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from database_helpers import create_session

engine = create_engine('sqlite:////home/banana/banana_phone/banana_phone.db', echo=True)

#initiaize your Base for building models
Base = declarative_base()

# define the whitelist table
class WhiteList(Base):
    __tablename__ = 'whitelist'

    id = Column(Integer, primary_key=True)
    number = Column(Integer)
    timestamp = Column(DateTime, default=datetime.datetime.utcnow)

    def __repr__(self):
        return "<Whitelist CID: %s>" % self.number

class CallLog(Base):
    __tablename__ = 'log'

    id = Column(Integer, primary_key=True)
    number = Column(Integer)
    name = Column(String)
    timestamp = Column(DateTime, default=datetime.datetime.utcnow)

    def __repr__(self):
        return "<Call Log entry: %s - %s - %s>" % (self.name, self.number, self.timestamp)

Base.metadata.create_all(engine)

CID_helpers.py

These are a collection of simple data validation functions for incoming caller ID numbers from Asterisk.

It is by no means extensive, but it covers that basics and is pretty simple to follow

nano CID_helpers.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import re

def valid_digits( cid ):
    ###################################################
    #
    # --- check incoming caller ID only contains digits
    #
    # input: caller ID string from Asterisk 
    #
    # output: boolean 
    #
    ####################################################

    # ensure var is string
    str_cid = str(cid)
    #print str_cid

    # if valid digit string, return true
    if str_cid.isdigit():
        return True
    else:
        return False

def cid_visible( cid ):
    #######################################
    #
    # --- check if caller ID is present ---
    #
    # input: caller ID string from Asterisk
    #
    # output: boolean
    #
    #######################################

    # print cid

    # ensure var is string
    str_cid = str(cid)

    if (str_cid == "Unknown"):
        #print "Blank CID - do not save"
        return False
    elif (str_cid == ''):
        #print "blank CID - actual blank string is registered for number - weird"
        return False
    else:
        #print "CID is present"
        return True

def valid_length( cid ):
    #####################################################
    #
    # --- check that caller ID is the correct length ---
    # either 7, 10 or 11 digits only
    #
    # input: caller ID string from Asterisk
    #
    # output:  boolean
    #
    #####################################################

    # print cid

    # ensure var is string
    #     

    if (len(cid) == 7):
        #print "valid CID length - 7 digits, local number"
        return True
    elif (len(cid) == 11):
        return True
    elif (len(cid) == 10):
        #print "valid CID length - 10 digits, full number"
        return True
    else:
        #print "number is not standard, probs fake: length is %s" % (len(cid))
        return False

def non_repeating_cid( cid ):
    ##################################################################
    #
    # --- check repeating numbers in phone number ---
    # ex. "(111) 111-1111
    # 
    # input: caller ID string from Asterisk
    #
    # output: boolean
    #
    #################################################################

    # ensure var is string
    str_cid = str(cid)

    searchObj = re.search(r'\b(\d)\1+\b', str_cid)
    # if returns, cid is repeating number, throw false
    if searchObj:
        return False
    else:
        return True

verify_CID.py

This script validates that a caller ID looks legit enough to be put on the WhiteList. It's used after the call passes the human test.

nano verify_CID.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import argparse
from CID_helpers import *

parser = argparse.ArgumentParser()
parser.add_argument("cid", help="incoming caller id")
args = parser.parse_args()

#####################################
#
# --- check all the CID helpers ---
#
# if they all are true, the CID is good for saving
#
# input: caller ID string from Asterisk
#
# output: text string "YES" or "NO"
#
# --- if this output look weird, it's because it totally is. In order to 
# --- "massage" Asterisk into being scripted from the dialplan, we print out strings
# --- and use the Shell() application in the dialplan to extract the echoed string 
# --- to return the value into a dialplan variable.
# ---
# --- This is way stable and I opted for this because shitty AGI locks up 
#  
#####################################

incoming_cid = args.cid


visible_number = cid_visible(incoming_cid)

good_length = valid_length(incoming_cid)

all_digits = valid_digits(incoming_cid)

non_repeat = non_repeating_cid(incoming_cid)


if visible_number and good_length and all_digits and non_repeat:
    print 'YES'
else:
    print 'NO'

add_log.py

This saves caller ID info and time it came in to the CallLog table. We don't use it directly in any of the Banana Phone dialplan, but I can't leave this obvious feature out. You should always keep logs.

nano add_log.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import argparse
import datetime
from CID_helpers import *
from database_maps import CallLog
from database_helpers import create_session

###################################################
#
# --- add caller ID string to CallLog ---
#
# input: caller ID number, caller ID name, epoch timestamp of call
# 
# output: will show a string rep of the record that's saved in the Asterisk debug console
#
################################################### 


# prep cli args for use
parser = argparse.ArgumentParser()
parser.add_argument("cid", help="incoming caller id")
parser.add_argument("name", help="incoming caller name")
parser.add_argument("timestamp", help="call timestamp")
args = parser.parse_args()

cid_value = ''
name_value = ''
call_time = datetime.datetime.fromtimestamp(float(args.timestamp))
#print call_time

#check for blank cid, if so, put in filler string for logs
shown_cid = cid_visible(args.cid)
if not shown_cid:
    cid_value = 'No Caller ID'
else:
    cid_value = args.cid

# check for blank name, fill if blank
if args.name == '':
    name_value = 'No Name'
else:
    name_value = args.name

print "entry for log: %s - %s - %s" % (cid_value, name_value, call_time)

try:
    session = create_session()
    #create a call log entry obj
    log_add = CallLog(number=cid_value, name=name_value, timestamp=call_time)
    session.add(log_add)
    session.commit()
except Exception as e:
    print e

add_whitelist.py

This adds a caller ID string to the WhiteList table. This is called after the verify_CID script is run to ensure the CID being added is clean.

nano add_whitelist.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import datetime
import argparse
from database_maps import WhiteList
from database_helpers import create_session

###########################################
#
# --- add caller ID to WhiteList table ---
#
# input: caller ID number, call timestamp
#
# output: nothing unless there is an error
#
###########################################

parser = argparse.ArgumentParser()
parser.add_argument("cid", help="incoming caller id")
parser.add_argument("timestamp", help="call timestamp")
args = parser.parse_args()

call_time = datetime.datetime.fromtimestamp(float(args.timestamp))

try:
    session = create_session()
    #create a white list entry obj
    number_add = WhiteList(number=args.cid, timestamp=call_time)
    #print number_add
    # add to white list
    session.add(number_add)
    session.commit()
except Exception as e:
    print e

check_whitelist.py

This does exactly what it sounds like: takes a caller ID number as input, says if it's in the whitelist or not.

nano check_whitelist.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import argparse
from database_maps import WhiteList
from database_helpers import create_session

##############################################
#
# --- checks if caller ID is on WhiteList ---
# 
# input: caller ID string from Asterisk
#
# output: string "YES" or "NO"
#
#############################################

parser = argparse.ArgumentParser()
parser.add_argument("cid", help="incoming caller id")
args = parser.parse_args()

try:
    session = create_session()
    check_wl = session.query(WhiteList).filter_by(number=args.cid).first()
except Exception as e:
    print e

if check_wl is None:
    print "NO"
else:
    print "YES"

generate_auth_audio.py

Banana Phone generates audio clips with random 4-digit passwords embedded in them. The first part of that process starts with generating the 4-digit code audio and saving it to a place for later retrieval.

nano generate_auth_audio.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import subprocess
import argparse
import os
from random import randint


####################################################################
#
# --- generate a sox string command to create an 
# --- audio file containing a randomly generated 4-digit code
#
# input: timestamp of call in epoch format, 4 digit integer
#
# output: string containing a sox command to mix together 
# a new file from Asterisk audio resources
#
#####################################################################

parser = argparse.ArgumentParser()
parser.add_argument("timestamp", help="incoming caller id")
parser.add_argument("auth", help="auth code")
args = parser.parse_args()

#print '--------'
#print args.timestamp
ts_file = args.timestamp + '.sln'
#print 'timestamp file: ' + ts_file 
#print '--------'
#print args.auth

audio_dir = '/home/banana/banana_phone/audio_clips/slns'
auth_audio_dir = '/home/banana/banana_phone/auth_audio'
#file name tuples
digits_tup = ('0.sln', '1.sln', '2.sln', '3.sln', '4.sln', '5.sln', '6.sln', '7.sln', '8.sln', '9.sln')


# empty list for holding generated strings
file_list = []
# convert auth int into string
auth_code = str(args.auth)

#loop through auth password, adding files to list for sox to work on
for num in auth_code[:4]:
    path = os.path.join(audio_dir, digits_tup[int(num)])
    file_list.append(path)

final_path = os.path.join(auth_audio_dir, ts_file)

#print "final output path: " + final_path

# assemble the final sox command
cmd_string = 'sox %s %s %s %s %s %s' % (silence_path, file_list[0], file_list[1], 
                                       file_list[2], file_list[3], final_path)

# print out string for SHELL() to extract into dialplan
print cmd_string

generate_background_audio.py

Once the auth code audio file is created, this script mixes that in with background music and drops it to the 'asterisk_resource' folder.

nano generate_background_audio.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import argparse
import os
from random import randint

###################################################################
#
# --- mixes the auth code sound file over background music ---
#
# input: timestamp of current call
#
# output: string with sox command for mixing audio files,
#         drops file in custom asterisk sound directory
#
###################################################################

parser = argparse.ArgumentParser()
parser.add_argument("timestamp", help="incoming caller id")
args = parser.parse_args()

ts_file = args.timestamp + '.sln'
audio_dir = '/home/banana/banana_phone/audio_clips/slns'
auth_audio_dir = '/home/banana/banana_phone/auth_audio'
final_audio_dir = '/home/banana/banana_phone/asterisk_resources/sounds'

bk_tup = ('bk_0.sln', 'bk_1.sln', 'bk_2.sln')

# empty list for holding generated strings
file_list = []

# pick a number 0 - 3, use that as the background audio
bk_choice = randint(0,2)
bk_path = os.path.join(audio_dir, bk_tup[bk_choice])

final_path = os.path.join(final_audio_dir, ts_file)

auth_audio_file = os.path.join(auth_audio_dir, ts_file)

# assemble command string and print out for Shell() extraction 
cmd_string = 'sox -m %s %s %s' % (bk_path, auth_audio_file, final_path)
print cmd_string

show_log.py

This isn't used in the actual Banana Phone logic, but it's a nice little utility script for checking that our CallLog table is being written to correctly.

nano show_log.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from database_maps import CallLog
from database_helpers import create_session

def log_dump():
    session = create_session()
    check_wl = session.query(CallLog).all()
    return check_wl

log = log_dump()

for entry in log:
    print entry

With a few quick modifications, this script can be used to dump the whitelist entries as well.  Just replace every reference to "CallLog" with "WhiteList"

Run setup_database.py


In the same 'scripts' directory, run the setup_database script.

python setup_database.py

You should now have a database file in the 'banana_phone' directory that will contain our CallLog and WhiteList database records.

Database file and directory permissions


Banana Phone uses Sqlite as it's database for storing call logs and the whitelist. Sqlite requires that the parent folder of the database file also be writable.

I didn't wanna deal with making weird groups and all that bebop, I just set the 'banana_phone' directory and the database files' permissions to 777 (boo, hiss).

chmod 777 /home/banana/banana_phone/banana_phone.db
chmod 777 /home/banana/banana_phone

Audio files for Banana Phone


In my original prototype, I used a sound clip from that famous video from the Internet days of yore.

I don't think I can legally hand out copies of that song on my site, so we're just gonna use some royalty-free music courtesy of Bensound.com for the background music.

We also need some sound clips of the numbers 0 -9, and some explanatory intro speech and instructional speech, so I used Google's free Text-to-Speech service courtesy of soundoftext.com for those files.

Download and place the following files in the /home/banana/banana_phone/audio_clips/slns directory we made earlier:

Place the following files in the /home/banana/banana_phone/asterisk_resources/sounds

Make sure they're all owned by the 'banana' user.

The Banana Phone dialplan


This is where all Asterisk and our database scripts all come together to create our robocall blocker.

Here is a flow chart showing the logic flow for the Banana Phone dialplan.

 

I'm not doing any techno-sorcery here. It's really that simple.

Now actually getting Asterisk to do all those tricks is another story.

Asterisk has enough input flexibility within it's dialplan applications to where we can get programmatic call control without using AGI or AMI. And that's great, because the dialplan is the most stable programming option of the three.

* Please note: commands on the dialplan are meant to be on single lines, but the code highlighting generator I used put some of the longer ones on two lines. Just put them as single lines. Lines like 22/23, 25/26, 27/28, 49/50, etc. are supposed to be one line

Now here's the code for the Banana Phone dialplan. Open up /etc/asterisk/extensions.conf and replace our original, minimalist dialplan with this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
[globals]
APP_DIR=/home/banana/banana_phone/
SCRIPTS_DIR=/home/banana/banana_phone/scripts/

[general]
exten => _X.,1,Answer()
    same => n,NoOp("generic catch-all extension")
    same => n,NoOp(${EXTEN})
    same => n,HangUp()

[PhoneHandset]
exten => 222,1,Answer()
    same => n,VoiceMailMain(@banana-vm)

exten => _X.,1,Answer()
    same => n,NoOp("in PhoneHandset extension")
    same => n,NoOp("dialed number: ${EXTEN}")
    same => n,Dial(SIP/${EXTEN}@PhoneLine)

[PhoneLine]
exten => 5555555555,1,NoOp("call registered")
    same => n,Answer()
    same => n,Set(CallTimestamp=${STRFTIME(${EPOCH},,%s)})
    same => n,Background(initial_greet)
    same => n,System(python ${SCRIPTS_DIR}add_log.py ${CALLERID(num)} '${CALLERID(name)}' ${CallTimestamp})
    same => n,Set(result=${SHELL(python ${SCRIPTS_DIR}check_whitelist.py ${CALLERID(num)})})
    same => n,Set(inWL=${REGEX("([YES])" ${result})})
    same => n,NoOp(CID in DB: ${inWL})
    same => n,GotoIf($["${inWL}" = "1"]?liveline:banana)
    same => n(liveline),NoOp("passing to live line")
    same => n,Dial(SIP/PhoneHandset)
    same => n,VoiceMail(PhoneHandset@banana-vm,u)
    same => n,HangUp()
    same => n(banana),Goto(BananaPhone,start,1)
    same => n,HangUp()

[BananaPhone]
exten => start,1,Answer()
    same => n,NoOp("suspect call in trapline")
    same => n,Set(IncomingCID=${CALLERID(num)})
    same => n,Set(CallTimestamp=${STRFTIME(${EPOCH},,%s)})
    same => n,Set(auth=${RAND(1000,9999)})
    same => n,NoOp("generated auth: ${auth}")
    same => n,Playback(announcement)
    same => n,Playback(at_the_tone)
    same => n,Playback(only_once)
    same => n,Set(res=${SHELL(python ${SCRIPTS_DIR}generate_auth_audio.py ${CallTimestamp} ${auth})})
    same => n,NoOp(${res})
    same => n,System(${res})
    same => n,Set(clip=${SHELL(python ${SCRIPTS_DIR}generate_background_audio.py ${CallTimestamp})})
    same => n,NoOp(${clip})
    same => n,System(${clip})
    same => n,NoOp("This is after the end of the clip")
    same => n,Read(auth_input,${CallTimestamp},4,s,1,8)
    same => n,NoOp(grabbed input: ${auth_input})
    same => n,GotoIf($["${auth_input}" = "${auth}"]?success:hangup)
    same => n(success),Playback(verified)
    same => n,Set(ver=${SHELL(python ${SCRIPTS_DIR}verify_CID.py ${CALLERID(num)} )})
    same => n,NoOp(${ver})
    same => n,Set(addWL=${REGEX("([YES])" ${ver})})
    same => n,NoOp(${addWL})
    same => n,GotoIf($["${addWL}" = "1"]?add:liveline)
    same => n(hangup),HangUp()
    same => n(add),System(python ${SCRIPTS_DIR}add_whitelist.py ${CALLERID(num)} ${CallTimestamp} )
    same => n(liveline),Dial(SIP/PhoneHandset)
    ;same => n(liveline),NoOp("Success end of authentication")
    same => n,HangUp()

On line 18, replace "5555555555" with the 10-digit phone number of the phone line plugged into the Obi ATA.

There a couple subtle dialplan nuances used here, particularly when generating the audio files and when the caller is actually entering in the input from their phone. I will leave that for the reader to dissect if they feel so inclined. Google is your friend, and so is .

I also peppered in a bunch of NoOp() calls as debug messages you can see in the console.

Testing


Testing here is pretty straight forward.

  1. Call the phone line attached to your Obi ATA
  2. Follow the instructions
  3. Input the code
  4. See if it rings
  5. If it does, call again, you shouldn't have to test twice

Debugging tips

I've tried to write an easy-to-follow tutorial, with little debug checks at the end of every section. However, it's easy to miss steps or forget stuff, so if your install doesn't work right the first time you try, go through these steps:

  • Are you able to see calls coming into Asterisk?
    • Try running the minimal dialplan from section 2 and see if calls come in. If not, try redoing the configurations on Asterisk and the Obi ATA until you can.
  • If verified caller IDs keep getting tested, your database probably isn't getting written to.
    •   Are the permissions on the banana_phone.db file and it's parent directory /home/banana/banana_phone set correctly?
  • If your Obi ATA isn't registering correctly with Asterisk,
    • Make sure your IP address, usernames and passwords for our SIP endpoints are correct
    • Restart Asterisk and check the status page on the Obi
  • The Asterisk debug console is your friend. Use it to check how calls are being processed, where they mess up in the dialplan, and if your ATA is connected
  • When editing extensions.conf, you must reload the dialplan from the Asterisk debug console for changes to take affect. Remember to use this command at the debug console:
dialplan reload

Conclusions


That's it for making Banana Phone. 

I'm a one-man army trying to fight phone scams. I'm also in the middle of commercializing my idea, so if this tutorial helped solve your robocall problem or you got some legitimate good information from this, please consider donating to the project. Beer money goes along way ;)

Buy me a beer