This project uses public dweet.io service to control an IoT device (in this case, the Raspi model 3B) via mobile phone. Basically the user enter the dweet API (which contains the device ID) into the mobile phone browser, the dweet service will receive the request and send the data(in json format) to the device.
I learn this from Gary Smart book, Practical Python Programming for IoT. Excellent book to learn IoTs using Python.
- Run the Python script, the script will send a request to dweet.io for the latest dweet for this device aka ‘thing’, if any.
- User can enter the dweet API URL (which contains the device ID and key value pair) into the mobile phone browser. It has 3 states: ‘on’, ‘off’ and ‘blink’.
- The dweet service will receive the request and send the data(in json format) to the device, which is the Raspberry Pi, in this case.
- The python script will parse the json data and look for the key ‘state’, and get the value. Based on the value, the device will trigger the Led to on, off or blink.
The source code:
"""
Code taken from Gary Smart book , Practical Python Programming for IoT.
An excellent book for hobbist like me to combine Python with IoT devices/application - ******
"""
import signal
import json
import os
import sys
import logging
from gpiozero import Device, LED
from gpiozero.pins.pigpio import PiGPIOFactory
from time import sleep
from uuid import uuid1
import requests # (1)
# Global Variables
LED_GPIO_PIN = 21 # GPIO Pin 21 on Raspi 3B, pin number is 40(last pin on outer rail)
THING_NAME_FILE = 'thing_name.txt' # name of file that stores out device ID
URL = 'https://dweet.io' # Dweet.io service API
last_led_state = None # Current state of LED ("on", "off", "blinking")
thing_name = None # Thing name (as persisted in THING_NAME_FILE)
led = None # GPIOZero LED instance
# Initialize Logging, set level to INFO
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger('main') # Using logger for this module
logger.setLevel(logging.INFO)
# Initialize GPIO
Device.pin_factory = PiGPIOFactory()
# Function Definitions
def init_led(): #Initial and assign led variable
global led
led = LED(LED_GPIO_PIN)
led.off()
def resolve_thing_name(thing_file):
if os.path.exists(thing_file): # if file contains ID, reuse ID for 'thing'(device - Raspi 3B)
with open(thing_file, 'r') as file_handle:
name = file_handle.read()
logger.info('Thing name ' + name + ' loaded from ' + thing_file)
return name.strip()
else:
name = str(uuid1())[:8] # UUID object to string. # Else produce a 8 digit ID and write to file,
logger.info('Created new thing name ' + name) # for future use(reuse)
with open(thing_file, 'w') as f:
f.write(name)
return name
def get_latest_dweet():
resource = URL + '/get/latest/dweet/for/' + thing_name # An API request to dweet.io for device latest state
logger.debug('Getting last dweet from url %s', resource) # dweet last for 24hrs (free version)
r = requests.get(resource)
if r.status_code == 200: # if Status is ok, get data(json format)
dweet = r.json()
logger.debug('Last dweet for thing was %s', dweet)
dweet_content = None
if dweet['this'] == 'succeeded': # if there is 'succeeded' text in ['this'],
dweet_content = dweet['with'][0]['content'] # get the key-value pair in ['content']
return dweet_content
else:
logger.error('Getting last dweet failed with http status %s', r.status_code)
return {}
def poll_dweets_forever(delay_secs=2):
while True: # to consistently ping for dweet for our device
dweet = get_latest_dweet()
if dweet is not None:
process_dweet(dweet)
sleep(delay_secs)
def process_dweet(dweet):
global last_led_state
# to get the return from get_latest_dweet()and
# extract the ['state'] value. Can be 'on', 'off' or 'blink'.
if not 'state' in dweet:
return
led_state = dweet['state']
if led_state == last_led_state:
return # if LED is already in requested state.
if led_state == 'on':
led.on()
elif led_state == 'blink':
led.blink()
else: # Off, including any unhandled state.
led_state = 'off'
led.off()
if led_state != last_led_state:
last_led_state = led_state
logger.info('LED ' + led_state)
def print_instructions(): # some info for user
"""Print instructions to terminal."""
print("LED Control URLs - Try them in your web browser:")
print(" On : " + URL + "/dweet/for/" + thing_name + "?state=on")
print(" Off : " + URL + "/dweet/for/" + thing_name + "?state=off")
print(" Blink : " + URL + "/dweet/for/" + thing_name + "?state=blink\n")
def signal_handler(sig, frame):
print('You pressed Control+C') # to handle and capture when Ctrl C is pressed,
led.off # which is to exit the application for Linux/Unix OS
sys.exit(0)
# Initialise led variable and get or set device ID
thing_name = resolve_thing_name(THING_NAME_FILE) # this thing_name will be use in get_latest_dweet().
init_led()
# Main entry point
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler) # Capture CTRL + C
print_instructions()
# Initialise LED from last dweet.
last_dweet = get_latest_dweet()
if (last_dweet):
process_dweet(last_dweet)
print('Waiting for dweets. Press Control+C to exit.')
poll_dweets_forever() # Get dweets by polling a URL on a schedule
# by pinging dweet.io consistently using get_latest_dweet()
The video from my Youtube channel: