Support Forum

Share your projects and post your questions

Register   or   Sign In
The Forum

Interrupts / callback not working as expected

3225 Views - Created 06/03/2019

06/03/2019

Posted by:
braydon_w

braydon_w Avatar

I am trying to setup a simple main menu / state machine using 4 buttons (connected to IO Pi Plus Bus 1 Port 0) and 4 LEDs (Bus 1 Port 1). I would like the program to return to a specific state (function) whenever one of the 4 buttons is pressed, even when other code is running. I have setup a simple test function that begins once the 2nd button in pressed, but when a button is pressed while test() is running the callback doesnt occur until test() is finished printing...

Interrupt port 1A on IO Pi Plus Bus 1 in connected to GPIO pin 23 on RPi.

I hope this all made since! I'm still fairly new to this.




import time
import RPi.GPIO as GPIO
from IOPi import IOPi

bus1 = None

def button_pressed(interrupt_pin):
    global bus1

    """
    this function will be called when GPIO 23 falls low
    """
    # read the interrupt capture for port 0 and store it in variable intval
    intval = bus1.read_interrupt_capture(0)

    # compare the value of intval with the IO Pi port 0
    # using read_port().  wait until the port changes which will indicate
    # the button has been released.
    # without this while loop the function will keep repeating.

    while (intval == bus1.read_port(0)):
        time.sleep(0.2)

    # loop through each bit in the intval variable and check if the bit is 1
    # which will indicate a button has been pressed
    for num in range(0, 8):
        if (intval & (1 << num)):
#            print("Pin " + str(num + 1) + " pressed")

            if str(num + 1) == '1':
                return state0()

            elif str(num + 1) == '2':
                return state1()

            elif str(num + 1) == '3':
                return state2()

            elif str(num + 1) == '4':
                return state3()


def state0():
    print ("Pyano: Main Menu")
    bus1.write_pin(9, 1)
    bus1.write_pin(11, 0)
    bus1.write_pin(13, 0)
    bus1.write_pin(15, 0)

def state1():
    print ("Mode 1: midi_player")
    bus1.write_pin(9, 0)
    bus1.write_pin(11, 1)
    bus1.write_pin(13, 0)
    bus1.write_pin(15, 0)
    test()

def state2():
    print ("Mode 2: midi_maker")
    bus1.write_pin(9, 0)
    bus1.write_pin(11, 0)
    bus1.write_pin(13, 1)
    bus1.write_pin(15, 0)

def state3():
    print ("Mode 3: midi_live")
    bus1.write_pin(9, 0)
    bus1.write_pin(11, 0)
    bus1.write_pin(13, 0)
    bus1.write_pin(15, 1)

def test():
    i = 0
    while i < 3:
        print(i)
        time.sleep(2)
        i = i + 1

def main():
    """
    Main program function
    """
    global bus1
    # Create an instance of the IOPi class called bus1 and
    # set the I2C address to be 0x20 or Bus 1.
    bus1 = IOPi(0x20)
    # State LED indicator setup
    bus1.set_port_direction(1, 0x00)
    bus1.write_port(1, 0x00)
    # Set port 0 on the bus1 to be inputs with internal pull-ups enabled.
    bus1.set_port_pullups(0, 0xFF)
    bus1.set_port_direction(0, 0xFF)
    # Inverting the port will allow a button connected to ground to
    # register as 1 or on.
    bus1.invert_port(0, 0xFF)
    # Set the interrupt polarity to be active low so Int A and IntB go low
    # when an interrupt is triggered and mirroring disabled, so
    # Int A is mapped to port 0 and Int B is mapped to port 1
    bus1.set_interrupt_polarity(0)
    bus1.mirror_interrupts(0)
    # Set the interrupts default value to 0 so it will trigger when any of
    # the pins on the port 0 change to 1
    bus1.set_interrupt_defaults(0, 0x00)
    # Set the interrupt type to be 0xFF so an interrupt is
    # fired when the pin matches the default value
    bus1.set_interrupt_type(0, 0xFF)
    # Enable interrupts for all pins on port 0
    bus1.set_interrupt_on_port(0, 0xFF)
    # reset the interrups on the IO Pi bus1
    bus1.reset_interrupts()
    # set the Raspberry Pi GPIO mode to be BCM
    GPIO.setmode(GPIO.BCM)
    # Set up GPIO 23 as an input. The pull-up resistor is disabled as the
    # level shifter will act as a pull-up.
    GPIO.setup(23, GPIO.IN, pull_up_down=GPIO.PUD_OFF)
    # when a falling edge is detected on GPIO 23 the function
    # button_pressed will be run
    GPIO.add_event_detect(23, GPIO.FALLING, callback=button_pressed, bouncetime=300)
    # print out a message and wait for keyboard input before
    # exiting the program

    state = state0  # initial state
    while state:
        state = state()  # launch state machine
    input("Press enter to exit program \n")

    #make sure all LEDs are turned off
    bus1.write_port(1, 0x00)

if __name__ == "__main__":
    main()

06/03/2019

Posted by:
andrew

andrew Avatar

Location:
United Kingdom

andrew Twitter  andrew Website  

Hello

The problem you are having is due to the GPIO event listener and your test function running within the same thread. The time.sleep() call is pausing the thread for 2 seconds during which the event listener can not respond to any button presses.

What you will need to do is split your code into two threads with the gpio event listener in one thread and your other program code in a second thread. I have modified your program in the code below to show how you can use threads to make your program more responsive.


import time
from threading import Thread
from threading import Event
import RPi.GPIO as GPIO
from IOPi import IOPi

bus1 = None

def button_pressed(interrupt_pin):
    global bus1

    """
    this function will be called when GPIO 23 falls low
    """
    # read the interrupt capture for port 0 and store it in variable intval
    intval = bus1.read_interrupt_capture(0)

    # compare the value of intval with the IO Pi port 0
    # using read_port().  wait until the port changes which will indicate
    # the button has been released.
    # without this while loop the function will keep repeating.

    while (intval == bus1.read_port(0)):
        time.sleep(0.2)

    # loop through each bit in the intval variable and check if the bit is 1
    # which will indicate a button has been pressed
    for num in range(0, 8):
        if (intval & (1 << num)):
            # print("Pin " + str(num + 1) + " pressed")

            if str(num + 1) == '1':
                return state0()

            elif str(num + 1) == '2':
                return state1()

            elif str(num + 1) == '3':
                return state2()

            elif str(num + 1) == '4':
                return state3()


def state0():
    print("Pyano: Main Menu")
    bus1.write_pin(9, 1)
    bus1.write_pin(11, 0)
    bus1.write_pin(13, 0)
    bus1.write_pin(15, 0)

def state1():
    print("Mode 1: midi_player")
    bus1.write_pin(9, 0)
    bus1.write_pin(11, 1)
    bus1.write_pin(13, 0)
    bus1.write_pin(15, 0)

def state2():
    print("Mode 2: midi_maker")
    bus1.write_pin(9, 0)
    bus1.write_pin(11, 0)
    bus1.write_pin(13, 1)
    bus1.write_pin(15, 0)

def state3():
    print("Mode 3: midi_live")
    bus1.write_pin(9, 0)
    bus1.write_pin(11, 0)
    bus1.write_pin(13, 0)
    bus1.write_pin(15, 1)

def test():
    i = 0
    while i < 100:
        print(i)
        e = Event()
        e.wait(timeout=2) 
        i = i + 1

def gpio_thread():
    # set the Raspberry Pi GPIO mode to be BCM
    GPIO.setmode(GPIO.BCM)

    # Set up GPIO 23 as an input. The pull-up resistor is disabled as the
    # level shifter will act as a pull-up.
    GPIO.setup(23, GPIO.IN, pull_up_down=GPIO.PUD_OFF)

    # when a falling edge is detected on GPIO 23 the function
    # button_pressed will be run
    GPIO.add_event_detect(23, GPIO.FALLING, callback=button_pressed, bouncetime=300)


def main():
    """
    Main program function
    """
    global bus1
    # Create an instance of the IOPi class called bus1 and
    # set the I2C address to be 0x20 or Bus 1.
    bus1 = IOPi(0x20)
    # State LED indicator setup
    bus1.set_port_direction(1, 0x00)
    bus1.write_port(1, 0x00)
    # Set port 0 on the bus1 to be inputs with internal pull-ups enabled.
    bus1.set_port_pullups(0, 0xFF)
    bus1.set_port_direction(0, 0xFF)
    # Inverting the port will allow a button connected to ground to
    # register as 1 or on.
    bus1.invert_port(0, 0xFF)
    # Set the interrupt polarity to be active low so Int A and IntB go low
    # when an interrupt is triggered and mirroring disabled, so
    # Int A is mapped to port 0 and Int B is mapped to port 1
    bus1.set_interrupt_polarity(0)
    bus1.mirror_interrupts(0)
    # Set the interrupts default value to 0 so it will trigger when any of
    # the pins on the port 0 change to 1
    bus1.set_interrupt_defaults(0, 0x00)
    # Set the interrupt type to be 0xFF so an interrupt is
    # fired when the pin matches the default value
    bus1.set_interrupt_type(0, 0xFF)
    # Enable interrupts for all pins on port 0
    bus1.set_interrupt_on_port(0, 0xFF)
    # reset the interrups on the IO Pi bus1
    bus1.reset_interrupts()
    
    #make sure all LEDs are turned off
    bus1.write_port(1, 0x00)

    thread1 = Thread(target=gpio_thread())
    thread1.start()

    thread2 = Thread(target=test())
    thread2.start() 

    input("Press enter to exit program \n")

    

if __name__ == "__main__":
    main()

06/03/2019

Posted by:
braydon_w

braydon_w Avatar

How can I stop the test thread in this case?



This is what im trying to do:

Make a super simplified state machine with 4 states, each with their own button and LED indicator.

The first state is the main menu where the program is just waiting for a button so it can go to a more complex state.

The next 3 states will all be their own python program essentially. I would ideally like to have them in seperate .py files if possible. I have already written a lot of the code for these programs (they also use time.sleep which I see is a problem).

I want to be able to press the main menu button at any time and have all code executation stopped and for the program to return to the main menu state.

06/03/2019

Posted by:
andrew

andrew Avatar

Location:
United Kingdom

andrew Twitter  andrew Website  

Normally you wouldn't kill a thread directly as it may have some garbage collection to do which could result in memory leaks and crashing so the best way is to have a variable called exitRequested which can be set to True when you want the thread to exit. In your thread, you will need to check the exitRequested variable and manually end the thread by jumping to the end of the code in that function.

I have written a small program based on the description of what you need.


import time
from threading import Thread
from threading import Event
import RPi.GPIO as GPIO
from IOPi import IOPi

bus1 = None

runningThreads = []  # used to store a list of the running threads


class State1Thread(Thread):
    exitRequested = False

    def run(self):
        print("Mode 1: midi_player")
        i = 0
        while i < 5:
            print("Mode 1 running - " + str(i))
            time.sleep(1)
            if self.exitRequested:
                break
            i = i + 1

        # thread cleanup - run this before returning from the thread
        for t in runningThreads:
            if t.name == "State1":
                runningThreads.remove(t)
        return

    def exit(self):
        self.exitRequested = True
        print("Mode 1: exit")


class State2Thread (Thread):
    exitRequested = False

    def run(self):
        self.hasStarted = True
        self.isRunning = True
        print("Mode 2: midi_maker")
        i = 0
        while i < 5:
            print("Mode 2 running - " + str(i))
            time.sleep(1)
            if self.exitRequested:
                break
            i = i + 1
        # thread has finished so remote the thread from the runningThreads Arrary
        for t in runningThreads:
            if t.name == "State2":
                runningThreads.remove(t)
        return

    def exit(self):
        self.exitRequested = True
        print("Mode 2: exit")


class State3Thread (Thread):
    exitRequested = False

    def run(self):
        self.hasStarted = True
        self.isRunning = True
        print("Mode 3: midi_live")
        i = 0
        while i < 5:
            print("Mode 3 running - " + str(i))
            time.sleep(1)
            if self.exitRequested:
                break
            i = i + 1
        # thread has finished so remote the thread from the runningThreads Arrary
        for t in runningThreads:
            if t.name == "State3":
                runningThreads.remove(t)
        return

    def exit(self):
        self.exitRequested = True
        print("Mode 3: exit")


def button_pressed(interrupt_pin):
    global bus1

    """
    this function will be called when GPIO 23 falls low
    """
    # read the interrupt capture for port 0 and store it in variable intval
    intval = bus1.read_interrupt_capture(0)

    # compare the value of intval with the IO Pi port 0
    # using read_port().  wait until the port changes which will indicate
    # the button has been released.
    # without this while loop the function will keep repeating.

    while (intval == bus1.read_port(0)):
        time.sleep(0.2)

    # loop through each bit in the intval variable and check if the bit is 1
    # which will indicate a button has been pressed
    for num in range(0, 8):
        if (intval & (1 << num)):
            # print("Pin " + str(num + 1) + " pressed")

            if str(num + 1) == '1':
                return state0_pressed()

            elif str(num + 1) == '2':
                return state1_pressed()

            elif str(num + 1) == '3':
                return state2_pressed()

            elif str(num + 1) == '4':
                return state3_pressed()


def state0_pressed():
    print("Main Menu - stopping tasks")
    bus1.write_port(1, 0x01)
    # call exit for each thread in runningThreads
    for t in runningThreads:
        t.exit()


def state1_pressed():
    bus1.write_port(1, 0x04)
    # check if a thread is already running for state 1
    for t in runningThreads:
        if t.name == "State1" and t.is_alive():
            return

    # create a new state 1 thread and append it to the runningThreads array
    state = State1Thread()
    state.name = "State1"
    runningThreads.append(state)

    # start the thread
    state.start()


def state2_pressed():
    bus1.write_port(1, 0x10)
    # check if a thread is already running for state 2
    for t in runningThreads:
        if t.name == "State2" and t.is_alive():
            return
    
    # create a new state 2 thread and append it to the runningThreads array
    state = State2Thread()
    state.name = "State2"
    runningThreads.append(state)

    # start the thread
    state.start()


def state3_pressed():
    bus1.write_port(1, 0x40)
    # check if a thread is already running for state 3
    for t in runningThreads:
        if t.name == "State3" and t.is_alive():
            return
    
    # create a new state 3 thread and append it to the runningThreads array
    state = State3Thread()
    state.name = "State3"
    runningThreads.append(state)

    # start the thread
    state.start()


def main():
    """
    Main program function
    """
    global bus1
    bus1 = IOPi(0x20)
    
    bus1.set_port_direction(1, 0x00)
    bus1.write_port(1, 0x00)
    
    bus1.set_port_pullups(0, 0xFF)    
    bus1.set_port_direction(0, 0xFF)
    bus1.invert_port(0, 0xFF)
    
    bus1.set_interrupt_polarity(0)
    bus1.mirror_interrupts(0)
    bus1.set_interrupt_defaults(0, 0x00)
    bus1.set_interrupt_type(0, 0xFF)
    bus1.set_interrupt_on_port(0, 0xFF)
    bus1.reset_interrupts()

       # set the Raspberry Pi GPIO mode to be BCM
    GPIO.setmode(GPIO.BCM)

    # Set up GPIO 23 as an input. The pull-up resistor is disabled as the
    # level shifter will act as a pull-up.
    GPIO.setup(23, GPIO.IN, pull_up_down=GPIO.PUD_OFF)

    # when a falling edge is detected on GPIO 23 the function
    # button_pressed will be run
    GPIO.add_event_detect(23, GPIO.FALLING, callback=button_pressed, bouncetime=300)

    input("Press enter to exit program \n")

    

if __name__ == "__main__":
    main()


At the top of the program is an array called runningThreads, this is used to store a list of the threads that are running. A thread can only be started once so you need to create a new instance of the thread each time you want it to start.

There are three classes State1Thread, State2Thread, and State3Thread. These are where you would put the code that you want to run on each button press. Each class has two functions, the run function is called automatically when the thread is started. The exit function sets the exitRequested variable to True which you will need to monitor in your code so you know when to exit the thread early. There is a small amount of thread cleanup code that needs to be run before exiting a thread, it removes the current thread from the runningThreads array before calling return.

Each button calls its own function, state0_pressed(), state1_pressed(), state2_pressed() and state3_pressed().

state0_pressed() loops through the runningThreads array and calls exit for each thread. This is the cleanest way of making all of the threads exit.

For the other three functions, they first check to see if the thread for that function is in the runningThreads array and if it is currently alive. The for loop is needed to make sure you don't accidentally start more than one instance of the thread. If the thread is not running then it creates a new instance of the thread, gives it a name, appends it to runningThreads and starts it.

I moved the GPIO code back into the main function as it does not need to be in its own thread.

Using threads in python is fairly complicated but hopefully, this code will get you started with your program.

08/03/2019

Posted by:
braydon_w

braydon_w Avatar

Thank you so much! This is exactly what I needed to get started.

Sign in to post your reply

Note: documents in Portable Document Format (PDF) require Adobe Acrobat Reader 5.0 or higher to view.
Download Adobe Acrobat Reader or other PDF reading software for your computer or mobile device.