Auto-Recycler - Puts Paper, PMD and Rest in Different Cans

by thibault-verrue in Circuits > Raspberry Pi

99 Views, 1 Favorites, 0 Comments

Auto-Recycler - Puts Paper, PMD and Rest in Different Cans

447667707_3696936420635074_1625363685805975225_n.jpg

I made an Auto-Recycler that uses AI to recycle your trash automatically.

Supplies

  • Raspberry Pi
  • 3 Servo Motors
  • A Servo Motor Driver
  • 3 Ultrasonic Sensors
  • 3 Trash Cans
  • A Breadboard

Gather and Prepare Your Data

image_2024-06-11_095515195.png
image_2024-06-11_102705067.png
image_2024-06-11_103155850.png

Gathering

You can do 2 things to gather your data.

  • You can download datasets on internet on websites like Kaggle or Roboflow.
  • You can make your own data and take enough pictures of trash.


Preparing

If you think you have enough data (around 1000 pictures per class would be fine), you make a Roboflow account and annotate your data (picture 1). You upload all your pictures and annotate it by making a box around the trash kind. You also give it a class name picture 2).

After you are done you go to 'Versions' and click on 'Create New Version' (picture 3). You chose certain augmentations you want and then create it. Then after this export your dataset to to a YoloV8 zip.

Training Your Model

Here I will say what you need to do to train you model step by step. Make a jupyter notebook file, name it choosename.ipynb:


First step: importing the needed library's:

from ultralytics import YOLO
from skimage.io import imread


Second step: defining a model to start training:

#define the yolo model
model = YOLO("yolov8n.pt")


Third step: chose an amount of epochs and start training (here I did 80):

# Train the model
results = model.train(data="data.yaml" ,epochs=80, imgsz=480, batch=32)


This is all you need to do to train your model.

If your model is not working good after this, you can continue training using this model

model = YOLO("path/to/best.pt")
results = model.train(data="data.yaml" ,epochs=amountepochs, imgsz=480, batch=32)

Connecting Supplies

image_2024-06-11_110502318.png
image_2024-06-11_111053504.png

In all 3 lids of you put the sensor in the middle as you see on the picture and the motor at the back. with your motors you get a attachment thing and tape a longer stick to it like you see on the picture. you connect this to your motor.

Write Code for AI Side

The following code is what you need to run your AI and see camera feed.

The send_data_to_server function is needed to send your AI output to your Raspberry pi.


import cv2
import math
import socket
import threading
from ultralytics import YOLO


def send_data_to_server(ip, port, message):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((ip, port))
        s.sendall(message.encode())
        data = s.recv(1024)
    print('Received', repr(data))

def main():
    cap = cv2.VideoCapture(0)
    cap.set(3, 640)
    cap.set(4, 480)
    raspberry_pi_ip = '192.168.168.167'  # Replace with your Raspberry Pi's IP address
    port = 65432

    # Load model
    model = YOLO("AI/runs/detect/train5/weights/best.pt")

    # Object classes
    classNames = ['PMD', 'Paper', 'Rest']
    class_name = "None"  # Initialize with a default value

    while True:
        success, img = cap.read()
        if not success:
            break

        results = model(img, stream=True)

        # Process coordinates
        for r in results:
            boxes = r.boxes
            if boxes:
                # Reset class_name for this frame
                class_name = "None"

            for box in boxes:
                # Bounding box
                x1, y1, x2, y2 = map(int, box.xyxy[0])

                # Draw bounding box
                cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 255), 3)

                # Confidence
                confidence = math.ceil(box.conf[0] * 100) / 100
                print(f"Confidence: {confidence}")

                # Class name
                cls = int(box.cls[0])
                class_name = classNames[cls]
                print(f"Class name: {class_name}")

                # Display object details
                org = (x1, y1 - 10)
                font = cv2.FONT_HERSHEY_SIMPLEX
                font_scale = 0.5
                color = (255, 0, 0)
                thickness = 2

                cv2.putText(img, f"{class_name} {confidence:.2f}", org, font, font_scale, color, thickness)

        # Show the image with detections
        cv2.imshow('Webcam', img)

        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            break
        elif key == 32:  # Space key pressed  
            threading.Thread(target=send_data_to_server, args=(raspberry_pi_ip, port, class_name)).start()

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

Write Code for Server Side

import socket
from adafruit_servokit import ServoKit
import time
import RPi.GPIO as GPIO
import smbus

SENSORS = [
    {'TRIG': 23, 'ECHO': 24},
    {'TRIG': 27, 'ECHO': 22},
    {'TRIG': 5,  'ECHO': 6}
]

# LCD
I2C_ADDR = 0x27  # I2C device address for the LCD
LCD_WIDTH = 16    # Maximum characters per line
LCD_CHR = 1
LCD_CMD = 0
LCD_LINE_1 = 0x80
LCD_LINE_2 = 0xC0
LCD_BACKLIGHT = 0x08
ENABLE = 0b00000100
E_PULSE = 0.0005
E_DELAY = 0.0005

# Setup GPIO mode
GPIO.setmode(GPIO.BCM)

# Initialize ServoKit with the detected I2C address
kit = ServoKit(channels=16)  # Use the correct I2C address for the PCA9685

class LCD:
    def __init__(self, i2c_addr):
        self.i2c_addr = i2c_addr
        self.bus = smbus.SMBus(1)

    def init_LCD(self):
        try:
            self.send_instruction(0x33)
            self.send_instruction(0x32)
            self.send_instruction(0x28)
            self.send_instruction(0x0C)
            self.send_instruction(0x06)
            self.clear_display()
        except Exception as e:
            print("LCD initialization failed:", e)

    def send_byte_with_e_toggle(self, bits):
        try:
            self.bus.write_byte(I2C_ADDR, (bits | ENABLE))
            time.sleep(E_PULSE)
            self.bus.write_byte(I2C_ADDR, (bits & ~ENABLE))
            time.sleep(E_DELAY)
        except Exception as e:
            print("Error sending byte with E toggle:", e)

    def set_data_bits(self, bits, mode):
        upper_nibble = (bits & 0xF0) | mode | LCD_BACKLIGHT
        lower_nibble = ((bits << 4) & 0xF0) | mode | LCD_BACKLIGHT
        self.send_byte_with_e_toggle(upper_nibble)
        self.send_byte_with_e_toggle(lower_nibble)

    def send_instruction(self, byte):
        self.set_data_bits(byte, LCD_CMD)

    def send_character(self, byte):
        self.set_data_bits(byte, LCD_CHR)

    def send_string(self, message, line, pos):
        if line == 1:
            self.send_instruction(LCD_LINE_1 + pos)
        elif line == 2:
            self.send_instruction(LCD_LINE_2 + pos)
        for char in message:
            self.send_character(ord(char))

    def clear_display(self):
        self.send_instruction(0x01)
        time.sleep(0.002)

def move_servo_slowly(servo, start_angle, end_angle, step=1, delay=0.01):
    if start_angle < end_angle:
        for angle in range(start_angle, end_angle + 1, step):
            kit.servo[servo].angle = angle
            time.sleep(delay)
    else:
        for angle in range(start_angle, end_angle - 1, -step):
            kit.servo[servo].angle = angle
            time.sleep(delay)

def measure_distance(sensor):
    TRIG = sensor['TRIG']
    ECHO = sensor['ECHO']
   
    # Setup GPIO pins
    GPIO.setup(TRIG, GPIO.OUT)
    GPIO.setup(ECHO, GPIO.IN)

    # Ensure trigger is low
    GPIO.output(TRIG, False)
    time.sleep(2)  # Let the sensor settle

    # Send 10us pulse to trigger
    GPIO.output(TRIG, True)
    time.sleep(0.00001)
    GPIO.output(TRIG, False)
   
    # Measure the time between sending and receiving the echo
    start_time = time.time()
    stop_time = time.time()
   
    while GPIO.input(ECHO) == 0:
        start_time = time.time()
   
    while GPIO.input(ECHO) == 1:
        stop_time = time.time()
   
    # Calculate distance
    elapsed_time = stop_time - start_time
    distance = (elapsed_time * 34300) / 2  # Speed of sound: 34300 cm/s
   
    return distance

def start_server(host='0.0.0.0', port=65432):
    lcd = LCD(I2C_ADDR)
    lcd.init_LCD()
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((host, port))
        s.listen()
        print(f"Server listening on {host}:{port}")
        while True:
            conn, addr = s.accept()
            with conn:
                print(f"Connected by {addr}")
                while True:
                    data = conn.recv(1024)
                    if not data:
                        break
                    type = data.decode()
                    print(f"Received: {type}")
                    if type == "Rest":
                        distanceRest = measure_distance(SENSORS[1])
                        print(distanceRest)
                        if distanceRest >= 25:
                            lcd.send_string("Type: Rest", 1, 0)
                            move_servo_slowly(2, 0, 140)
                            time.sleep(3)
                            kit.servo[2].angle = 0
                            lcd.clear_display()
                        else:
                            lcd.send_string("Please empty:", 1, 0)
                            lcd.send_string("Rest", 2, 5)
                            time.sleep(5)
                            lcd.clear_display()

                    elif type == "PMD":
                        distancePMD = measure_distance(SENSORS[0])
                        if distancePMD >= 25:
                            lcd.send_string("Type: PMD", 1, 0)
                            move_servo_slowly(1, 0, 140)
                            time.sleep(3)
                            kit.servo[1].angle = 0
                            lcd.clear_display()
                        else:
                            lcd.send_string("Please empty:", 1, 0)
                            lcd.send_string("PMD", 2, 5)
                            time.sleep(5)
                            lcd.clear_display()
                    elif type == "Paper":
                        distancePaper = measure_distance(SENSORS[2])
                        if distancePaper >= 25:
                            lcd.send_string("Type: Paper", 1, 0)
                            move_servo_slowly(0, 0, 140)
                            time.sleep(3)
                            kit.servo[0].angle = 0
                            lcd.clear_display()
                        else:
                            lcd.send_string("Please empty:", 1, 0)
                            lcd.send_string("Paper", 2, 5)
                            time.sleep(5)
                            lcd.clear_display()
                    break
                print(f"Connection with {addr} closed")

if __name__ == "__main__":
    start_server()

Mounting Everything Together

448255121_776931614274098_2728492831828466301_n.jpg

step 1

  • connect ground and 5v pins to one side of the breadboard.
  • connect the sda and scl pins to the other side.

step 2

  • connect the sda and scl pins of the LCD and servo motor driver under the breadboard pins
  • connect the ground and 5v pins of the same devices to the breadboard.

step 3

  • connect all 3 servo motors to the driver
  • connect the pins of the sensors to these pins    
  • TRIG: 23, ECHO: 24
  • TRIG: 27, ECHO: 22
  • TRIG: 5,  ECHO: 6
  • connect the ground and 5V to the breadboard


Test Your Project

If you did everything correct your ready to start