GitHub - ArmDeveloperEcosystem/workshop-edge-ai-pi · GitHub
Skip to content

ArmDeveloperEcosystem/workshop-edge-ai-pi

Folders and files

Repository files navigation

Edge AI 101 Workshop: Raspberry Pi Smart Camera

Build a real edge-AI smart camera on Raspberry Pi that detects objects locally and turns those detections into application actions. The workshop works on a standard Raspberry Pi, with an optional AI HAT+ 2 path for participants who want to explore hardware-accelerated inference.


Workshop goals

By the end of the workshop, participants should understand how to build a local edge-AI application using a Raspberry Pi camera. The core pattern is:

Camera frame
  - Detector backend
  - Real detections: label, confidence, box
  - Local rule/action logic

There are two supported paths. We run the Raspberry Pi-only (CPU) path first, then add the AI HAT+ 2 to show the speed-up from hardware acceleration:

Option A: Raspberry Pi only
    Camera -> CPU/TFLite -> real detections -> local actions

Option B: Raspberry Pi 5 + AI HAT+ 2
    Camera -> Hailo NPU -> real detections -> local actions

Hardware and software assumptions

Raspberry Pi-only path

Recommended setup:

Raspberry Pi 5 preferred
Raspberry Pi 4 possible, but slower
Camera Module 3 or compatible Raspberry Pi camera
Raspberry Pi OS 64-bit
SSH access

AI HAT+ 2 path (optional acceleration)

Recommended setup:

Raspberry Pi 5
AI HAT+ 2
Camera Module 3 NoIR Wide, or compatible Raspberry Pi camera
Raspberry Pi OS 64-bit
SSH access
VLC on computer

1. Shared base setup

Run these commands on the Raspberry Pi.

sudo apt update
sudo apt full-upgrade -y
sudo rpi-eeprom-update -a
sudo reboot

Reconnect after reboot:

ssh <username>@<pi-hostname-or-ip>

2. Install shared tools

After reconnecting:

sudo apt update
sudo apt install -y \
  rpicam-apps \
  ffmpeg \
  wget \
  curl \
  git \
  python3 \
  python3-pip \
  python3-venv \
  python3-picamera2 \
  python3-opencv

Verify 64-bit OS:

uname -m
cat /etc/os-release

Expected output:

aarch64

Verify camera:

rpicam-hello --list-cameras
rpicam-still -o camera-test.jpg
ls -lh camera-test.jpg

Copy the test image to your computer:

scp <username>@<pi-hostname-or-ip>:~/camera-test.jpg .

Open the image locally.

3. Shared local rule logic

Create a reusable rules module that both backends can call.

cd ~
vi smart_desk_rules.py

Paste:

#!/usr/bin/env python3

import time


class SmartDeskRules:
    def __init__(self, confidence_threshold=0.70, empty_seconds=10.0, print_interval=1.0):
        self.confidence_threshold = confidence_threshold
        self.empty_seconds = empty_seconds
        self.print_interval = print_interval

        self.last_seen_time = time.time()
        self.last_print_time = 0.0
        self.last_labels = set()

    def handle(self, detections, backend_name):
        """
        detections is a list of dictionaries:

        {
            "label": "person",
            "confidence": 0.91,
            "box": [x1, y1, x2, y2]
        }
        """

        now = time.time()

        confident = [
            d for d in detections
            if d.get("confidence", 0.0) >= self.confidence_threshold
        ]

        if confident:
            self.last_seen_time = now

        labels = {d["label"] for d in confident}

        should_print = (
            (now - self.last_print_time) >= self.print_interval
            or labels != self.last_labels
        )

        if should_print:
            self.last_print_time = now
            self.last_labels = labels

            print()
            print(f"Backend: {backend_name}")

            if confident:
                for d in confident:
                    print(
                        f'  detected {d["label"]} '
                        f'confidence={d["confidence"]:.2f} '
                        f'box={d.get("box")}'
                    )
            else:
                print("  no confident detections")

            if "person" in labels:
                print("LOCAL ACTION: Person present -> mark workspace occupied")

            if "bottle" in labels:
                print("LOCAL ACTION: Bottle detected -> inventory item present")

            if "cell phone" in labels:
                print("LOCAL ACTION: Phone detected -> log device on table")

            if "laptop" in labels:
                print("LOCAL ACTION: Laptop detected -> workstation active")

        if now - self.last_seen_time >= self.empty_seconds:
            print()
            print(f"Backend: {backend_name}")
            print(f"LOCAL ACTION: No confident objects for {self.empty_seconds:.0f} seconds -> shelf empty")
            self.last_seen_time = now

Test that Python can import it:

python3 - <<'PY'
from smart_desk_rules import SmartDeskRules
rules = SmartDeskRules()
rules.handle([{"label": "person", "confidence": 0.91, "box": [0, 0, 1, 1]}], "test")
PY

Option A: Raspberry Pi-only CPU path

This path is for attendees without the HAT. It still performs real object detection, but on CPU.


A1. Create a CPU-only virtual environment

cd ~

python3 -m venv ~/venv_tflite --system-site-packages
source ~/venv_tflite/bin/activate

pip install --upgrade pip
pip install numpy

Install the LiteRT runtime:

pip install ai-edge-litert

A2. Download a small COCO object-detection model

Use a small model for CPU.

mkdir -p ~/tflite_models
cd ~/tflite_models

Download a common SSD MobileNet COCO TFLite model:

wget -O coco_ssd_mobilenet_v1_1.0_quant_2018_06_29.zip \
  https://storage.googleapis.com/download.tensorflow.org/models/tflite/coco_ssd_mobilenet_v1_1.0_quant_2018_06_29.zip

unzip -o coco_ssd_mobilenet_v1_1.0_quant_2018_06_29.zip
ls -l

You should see files like:

detect.tflite
labelmap.txt

A3. Create the CPU detection script

cd ~
vi smart_desk_tflite.py

Paste:

#!/usr/bin/env python3

import argparse
import time

import cv2
import numpy as np
from picamera2 import Picamera2

from smart_desk_rules import SmartDeskRules


def load_interpreter(model_path):
    # Try backends in order. tflite_runtime is the classic option, ai_edge_litert
    # is its modern successor (and the one with Python 3.13 wheels), and full
    # tensorflow is the heavy fallback. All expose the same Interpreter API.
    try:
        from tflite_runtime.interpreter import Interpreter
        print("Using tflite_runtime.Interpreter")
        return Interpreter(model_path=model_path)
    except ImportError:
        pass

    try:
        from ai_edge_litert.interpreter import Interpreter
        print("Using ai_edge_litert.Interpreter")
        return Interpreter(model_path=model_path)
    except ImportError:
        pass

    import tensorflow as tf
    print("Using tensorflow.lite.Interpreter")
    return tf.lite.Interpreter(model_path=model_path)


def load_labels(label_path):
    with open(label_path, "r", encoding="utf-8") as f:
        labels = [line.strip() for line in f.readlines()]
    return labels


def set_input_tensor(interpreter, image):
    input_details = interpreter.get_input_details()[0]
    tensor_index = input_details["index"]

    input_shape = input_details["shape"]
    height = int(input_shape[1])
    width = int(input_shape[2])

    resized = cv2.resize(image, (width, height))

    if input_details["dtype"] == np.float32:
        resized = resized.astype(np.float32)
        resized = (resized - 127.5) / 127.5
    else:
        resized = resized.astype(input_details["dtype"])

    resized = np.expand_dims(resized, axis=0)
    interpreter.set_tensor(tensor_index, resized)


def get_output_tensors(interpreter):
    output_details = interpreter.get_output_details()
    outputs = []
    for detail in output_details:
        outputs.append(interpreter.get_tensor(detail["index"]))
    return outputs


def parse_ssd_outputs(interpreter, labels, frame_width, frame_height, threshold):
    """
    Typical SSD MobileNet TFLite outputs:
      boxes:       [1, N, 4] with ymin, xmin, ymax, xmax normalized
      classes:     [1, N]
      scores:      [1, N]
      num_outputs: [1]
    """

    outputs = get_output_tensors(interpreter)

    boxes = outputs[0][0]
    classes = outputs[1][0]
    scores = outputs[2][0]

    detections = []

    for i, score in enumerate(scores):
        confidence = float(score)

        if confidence < threshold:
            continue

        class_id = int(classes[i])

        # This SSD MobileNet model emits 0-based COCO class ids (0 == person).
        # labelmap.txt has a leading "???" placeholder at index 0, so the
        # matching label is labels[class_id + 1], not labels[class_id].
        label_index = class_id + 1
        if label_index < len(labels):
            label = labels[label_index]
        else:
            label = f"class_{class_id}"

        if label == "???":
            continue

        ymin, xmin, ymax, xmax = boxes[i]

        x1 = int(max(0, xmin) * frame_width)
        y1 = int(max(0, ymin) * frame_height)
        x2 = int(min(1, xmax) * frame_width)
        y2 = int(min(1, ymax) * frame_height)

        detections.append(
            {
                "label": label,
                "confidence": confidence,
                "box": [x1, y1, x2, y2],
            }
        )

    return detections


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--model",
        default="/home/gabriel/tflite_models/detect.tflite",
        help="Path to .tflite model",
    )
    parser.add_argument(
        "--labels",
        default="/home/gabriel/tflite_models/labelmap.txt",
        help="Path to labelmap.txt",
    )
    parser.add_argument(
        "--threshold",
        type=float,
        default=0.50,
        help="Confidence threshold for local rules",
    )
    parser.add_argument(
        "--empty-seconds",
        type=float,
        default=10.0,
        help="Seconds without confident detections before shelf-empty action",
    )
    parser.add_argument(
        "--width",
        type=int,
        default=640,
        help="Camera capture width",
    )
    parser.add_argument(
        "--height",
        type=int,
        default=480,
        help="Camera capture height",
    )
    parser.add_argument(
        "--interval",
        type=float,
        default=0.5,
        help="Seconds between inference attempts",
    )

    args = parser.parse_args()

    labels = load_labels(args.labels)

    interpreter = load_interpreter(args.model)
    interpreter.allocate_tensors()

    rules = SmartDeskRules(
        confidence_threshold=args.threshold,
        empty_seconds=args.empty_seconds,
        print_interval=1.0,
    )

    picam2 = Picamera2()
    config = picam2.create_preview_configuration(
        main={"size": (args.width, args.height), "format": "RGB888"}
    )
    picam2.configure(config)
    picam2.start()

    print("CPU/TFLite smart desk detector running.")
    print("Press Ctrl+C to stop.")

    try:
        while True:
            frame = picam2.capture_array()
            frame_height, frame_width = frame.shape[:2]

            set_input_tensor(interpreter, frame)

            start = time.time()
            interpreter.invoke()
            inference_ms = (time.time() - start) * 1000.0

            detections = parse_ssd_outputs(
                interpreter=interpreter,
                labels=labels,
                frame_width=frame_width,
                frame_height=frame_height,
                threshold=args.threshold,
            )

            rules.handle(detections, backend_name=f"tflite_cpu inference={inference_ms:.1f}ms")

            time.sleep(args.interval)

    except KeyboardInterrupt:
        print()
        print("Stopping.")
    finally:
        picam2.stop()


if __name__ == "__main__":
    main()

If your username is not gabriel, either pass --model and --labels explicitly or edit the defaults.

Make it executable:

chmod +x smart_desk_tflite.py

Run it:

source ~/venv_tflite/bin/activate

python3 ~/smart_desk_tflite.py \
  --model ~/tflite_models/detect.tflite \
  --labels ~/tflite_models/labelmap.txt \
  --threshold 0.50

Expected output (with a person in frame):

Using ai_edge_litert.Interpreter
CPU/TFLite smart desk detector running.
Press Ctrl+C to stop.

Backend: tflite_cpu inference=85.0ms
  detected person confidence=0.63 box=[...]
LOCAL ACTION: Person present -> mark workspace occupied

A4. CPU-only troubleshooting

Interpreter install fails

The script tries tflite_runtime, then ai_edge_litert, then tensorflow. On current Raspberry Pi OS (Python 3.13) the working package is ai-edge-litert:

pip install ai-edge-litert

If that somehow fails, fall back to full TensorFlow (much larger):

pip install tensorflow

The script will then use tensorflow.lite.Interpreter automatically.

Camera busy

Stop any other camera app:

pkill -f rpicam-vid
pkill -f hailo-detect
pkill -f smart_desk_detection.py
pkill -f smart_desk_tflite.py

Then retry.

It is too slow

Use a smaller camera frame:

python3 ~/smart_desk_tflite.py \
  --model ~/tflite_models/detect.tflite \
  --labels ~/tflite_models/labelmap.txt \
  --width 320 \
  --height 240 \
  --interval 0.2

It detects odd labels

The model is COCO-trained. Use common COCO objects:

person
bottle
cup
chair
laptop
keyboard
mouse
cell phone
book
backpack

Option B: AI HAT+ 2 accelerated path

This path uses the AI HAT+ 2 for accelerated inference.


B1. Install AI HAT+ 2 software

For AI HAT+ 2, install:

# hailo-h10-all only for HAT+ 2, for Original AI Hat / AI Kit, hailo-all
sudo apt update
sudo apt install -y dkms
sudo apt install -y hailo-h10-all
sudo reboot

Reconnect:

ssh <username>@<pi-hostname-or-ip>

Verify Hailo:

hailortcli fw-control identify
dmesg | grep -i hailo

You should see Hailo device information. On AI HAT+ 2, expect Hailo-10H references.

Important package distinction:

AI HAT+ 2:
    hailo-h10-all

Original AI HAT / AI Kit:
    hailo-all

Do not install both on the same OS image.

B2. Verify rpicam Hailo assets

ls /usr/share/rpi-camera-assets/ | grep hailo

You want to see assets such as:

hailo_yolov8_inference.json
hailo_yolov6_inference.json
hailo_yolox_inference.json
hailo_yolov8_pose.json

Quick no-preview smoke test:

rpicam-hello -t 5000 \
  --nopreview \
  --post-process-file /usr/share/rpi-camera-assets/hailo_yolov8_inference.json

If you later get this error:

ERROR: *** Low res image larger than video ***

use explicit dimensions on video commands:

--width 1280 --height 720

B3. Install MediaMTX for live RTSP streaming

MediaMTX is an open-source media server/router. In this workshop, it gives us a reliable RTSP stream that VLC can open.

Do not guess the release filename. Ask GitHub for the current Arm asset:

cd ~

python3 - <<'PY'
import json
import urllib.request

url = "https://api.github.com/repos/bluenviron/mediamtx/releases/latest"
data = json.load(urllib.request.urlopen(url))

print("Latest:", data["tag_name"])
print()

for asset in data["assets"]:
    name = asset["name"]
    if "linux" in name.lower() and ("arm64" in name.lower() or "aarch64" in name.lower()):
        print(name)
        print(asset["browser_download_url"])
PY

In testing, this returned:

mediamtx_v1.19.1_linux_arm64.tar.gz
https://github.com/bluenviron/mediamtx/releases/download/v1.19.1/mediamtx_v1.19.1_linux_arm64.tar.gz

Download and extract the URL printed by the script. Example using the tested version:

cd ~

wget https://github.com/bluenviron/mediamtx/releases/download/v1.19.1/mediamtx_v1.19.1_linux_arm64.tar.gz

rm -rf ~/mediamtx
mkdir -p ~/mediamtx

tar -xzf mediamtx_v1.19.1_linux_arm64.tar.gz -C ~/mediamtx

cd ~/mediamtx
ls -l

Start MediaMTX:

./mediamtx

Leave this terminal open.

B4. Live visual demo: AI HAT+ 2 object detection to VLC

Open a second SSH session:

ssh <username>@<pi-hostname-or-ip>

Start the RTSP publisher:

rpicam-vid -t 0 \
  --nopreview \
  --width 1280 --height 720 \
  --framerate 30 \
  --inline \
  --codec libav \
  --libav-format rtsp \
  --post-process-file /usr/share/rpi-camera-assets/hailo_yolov8_inference.json \
  -o rtsp://127.0.0.1:8554/smart-desk

On your computer, open VLC:

rtsp://<pi-ip-address>:8554/smart-desk

B5. Backup visual demo: record MP4

If live streaming fails, record a 10-second annotated MP4:

rpicam-vid -t 10000 \
  --nopreview \
  --width 1280 --height 720 \
  --framerate 30 \
  --codec libav \
  --libav-format mp4 \
  --post-process-file /usr/share/rpi-camera-assets/hailo_yolov8_inference.json \
  -o smart-desk-demo.mp4

Check duration:

ffprobe smart-desk-demo.mp4

Copy it to the computer:

scp <username>@<pi-hostname-or-ip>:~/smart-desk-demo.mp4 .

B6. Install Hailo Apps for real callback-driven app logic

Stop the RTSP publisher first. It uses both the camera and Hailo NPU.

In the terminal running rpicam-vid, press:

Ctrl+C

Or clean up:

pkill -f rpicam-vid
pkill -f hailo-detect
pkill -f detection.py

Now install Hailo Apps:

cd ~

sudo apt update
sudo apt install -y git python3-venv python3-pip gir1.2-gtk-3.0

git clone https://github.com/hailo-ai/hailo-apps.git
cd hailo-apps

For this workshop we only need one small detection model. Scope the download to the simple_detection group, which is a single ~4 MB model (yolov6n):

sed -i 's/download_group: "default"/download_group: "simple_detection"/' \
  hailo_apps/config/config.yaml

Confirm the change:

grep download_group hailo_apps/config/config.yaml
# expect: download_group: "simple_detection"

Optional: leaner, faster install

sed -i -E '/^\s*"(lancedb|tokenizers|scipy|lap|cython_bbox)/d' pyproject.toml

It deletes a handful of heavy Python dependencies (lancedb, tokenizers, scipy, lap, cython_bbox) from the project file before install. Those are only used by hailo-apps' gen-AI/audio apps.

Now install:

sudo ./install.sh
source setup_env.sh

Smoke test:

hailo-detect --input rpi --hef-path yolov6n

If you see errors like:

Failed to create vdevice. there are not enough free devices.
Device or resource busy

another process still owns the Hailo NPU or camera. Run:

ps aux | grep -E "rpicam|hailo|python|mediamtx" | grep -v grep
pkill -f rpicam-vid
pkill -f hailo-detect
pkill -f detection.py

MediaMTX itself can stay running, but the rpicam-vid publisher must be stopped.

If still stuck:

sudo reboot

Then reconnect and retry:

cd ~/hailo-apps
source setup_env.sh
hailo-detect --input rpi --hef-path yolov6n

B7. Create the real Hailo smart-desk app

Make a copy of the detection app:

cd ~/hailo-apps/hailo_apps/python/pipeline_apps/detection
cp detection.py smart_desk_detection.py

Copy the shared rules file into this directory so imports are simple:

cp ~/smart_desk_rules.py .

Edit:

vi smart_desk_detection.py

Find the existing app_callback function. Replace it with this version:

# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------
def app_callback(element, buffer, user_data):
    import os
    import sys

    current_dir = os.path.dirname(os.path.abspath(__file__))
    if current_dir not in sys.path:
        sys.path.append(current_dir)

    from smart_desk_rules import SmartDeskRules

    if buffer is None:
        hailo_logger.warning("Received None buffer.")
        return

    if not hasattr(user_data, "smart_desk_rules"):
        user_data.smart_desk_rules = SmartDeskRules(
            confidence_threshold=0.50,
            empty_seconds=10,
            print_interval=1.0,
        )

    user_data.increment()

    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    normalized = []

    for detection in detections:
        label = detection.get_label()
        confidence = detection.get_confidence()

        box = None
        try:
            bbox = detection.get_bbox()
            box = [
                round(float(bbox.xmin()), 3),
                round(float(bbox.ymin()), 3),
                round(float(bbox.xmax()), 3),
                round(float(bbox.ymax()), 3),
            ]
        except Exception:
            box = None

        track_id = None
        try:
            tracks = detection.get_objects_typed(hailo.HAILO_UNIQUE_ID)
            if len(tracks) == 1:
                track_id = tracks[0].get_id()
        except Exception:
            track_id = None

        event = {
            "label": label,
            "confidence": float(confidence),
            "box": box,
        }

        if track_id is not None:
            event["track_id"] = track_id

        normalized.append(event)

    user_data.smart_desk_rules.handle(
        normalized,
        backend_name="hailo_hat_plus_2",
    )

    return

Run it:

cd ~/hailo-apps
source setup_env.sh

python3 hailo_apps/python/pipeline_apps/detection/smart_desk_detection.py \
  --input rpi --hef-path yolov6n \
  --width 320 --height 240

Expected output when real objects are in frame:

Backend: hailo_hat_plus_2
  detected person confidence=0.91 box=[...]
  detected bottle confidence=0.83 box=[...]
LOCAL ACTION: Person present -> mark workspace occupied
LOCAL ACTION: Bottle detected -> inventory item present

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

Contributors