FaceAuth: Easy Login to Study Platforms Using Facial Authentication

by Mykyta Tsykunov in Circuits > Raspberry Pi

66 Views, 3 Favorites, 0 Comments

FaceAuth: Easy Login to Study Platforms Using Facial Authentication

photo_2024-06-16_20-47-57.jpg

Imagine you study at a university or school and use a public computer to access your study materials. Each time, you need to type your password to log in. Or imagine your teacher wants to share a solution from their account, requiring them to enter sensitive data on your computer. To make this process easier and more secure, I came up with the idea of a facial authentication AI model. This model can be implemented on any study platform, providing quick access to your study materials on any device.

Easy logging is the key to making your study life simpler and more enjoyable!

The idea is to create a device with a Raspberry Pi inside, an LCD display as the output, and a USB camera connected to a laptop as the input. You can connect to the Raspberry Pi via Bluetooth and use it to log in to a study platform. Additionally, I plan to create a website to demonstrate the working model as if it were a real study platform. For this I'm going to use Flask, because you don't need a lot of knowledge to build a simple website using this python framework.

Supplies

photo_2024-06-16_20-42-26.jpg
photo_2024-06-16_20-42-25.jpg
photo_2024-06-16_20-42-27.jpg
photo_2024-06-16_20-42-24.jpg

Prepare Your Data

chrome_5vPFXbdg5S.png
chrome_gK95EbjzYW.png

First thing we need is to prepare our data, you can find some datasets of faces and use them, for this you can use the website called kaggle.com, I'll share links to the datasets I find, but you can use your own as well. I used dataset 70,000 Real Faces 6 but I took only 5k for the annotation. You can also check these datasets: Human Faces(7k), Flickr-Faces-HQ Dataset (Nvidia) - Part 5(8k) and 70,000 Real Faces 3

Now as we have our data, it's time to upload it on Roboflow, you can create an account for free, but I would reccomend to choose a Starter Plan for 14 days, you don't need to pay for it, but you'll get 10,000 Auto Label credits that you can use for labeling your faces, instead of manually labeling all of them.

Annotation of Your Data

chrome_kz38IEypYd.png
chrome_lyvZxlXYeP.png
chrome_beQoZT2Hux.png
chrome_AuulvnhQPS.png
chrome_rj8Uvjob08.png
chrome_74DNPxwfPq.png
chrome_lyfqjoM5Qi.png
chrome_2LxX0PL6lC.png
chrome_uB3i2ZtmKZ.png

Let's create a new roboflow project, I called it facedetect. I chose face for the Annotation Group, and it's really important to choose Object Detection for this one, because we need to first detect the user's face before implementing the Object Classification model.

Our next step is to upload our data. You can choose to upload several files or an entire folder, I chose the second one. Once all the images are uploaded, let's start annotation. We can do this manually or use the Auto Label feature (remember, the Starter Plan allows us to auto-label up to 10,000 images). I used Auto Label for most of my data and manually annotated some images. In total, I ended up with a dataset containing 5,204 images, but we're not done yet.

Next step is to click on Generate and perform the Augmentation.  In simple terms, augmentation creates more data from our dataset by applying transformations like shaping, rotating, and changing brightness. I'll show you my settings, but feel free to add more if you want. It's better to keep it simple, as understanding what enhances our model versus what might reduce accuracy is important.  So the total number of images in my dataset was almost 20k.

Now, we can finally click Create and after creating a version of our dataset, click Export dataset, and choose YOLOv8, show download code and click Continue. You'll see a code you need to copy and paste into your project.

Training a Face Detection Model

To train our face detection model, we will use Ultralytics. If you haven't installed it, please do so. Keep in mind that different versions might have slightly different syntax. I am using Ultralytics 8.2.26, so you can install this version to avoid some errors.

First, copy the code that you get when you export your dataset and run it. You'll see a new folder with the name of your project. This is our dataset. Now let's train our model. I'm going to use CUDA, which allows you to train the model using the GPU instead of the CPU, making the process faster. If you don't have CUDA, you need to install PyTorch. I am using PyTorch 2.0.1+cu117.

Here is the code to train the model:

from ultralytics import YOLO


def main():
    # Load the YOLO model
    model = YOLO(model="yolov8s.pt")
    # model = YOLO(model="yolov8n.pt")
   
    # Set the device to GPU
    device = 'cuda'
   
    # Train the model on the GPU
    model.train(
        data="path_to_your_project\\facedetect-1\\data.yaml",
        epochs=20,
        imgsz=(640, 640),
        verbose=True,
        batch=8,
        device=device
    )
   
    # Validate the model
    model.val()
   
    # Export the model
    model.export()


if __name__ == '__main__':
    main()

After training, you can check the runs/detect directory and see several training runs. Choose the one that contains a folder named weights and find a file called best.pt. This is our final model. For me, the best model is located in /runs/detect/train5/weights/best.pt. Now, let's test it:

from ultralytics import YOLO


# Load your trained model
model = YOLO("./runs/detect/train5/weights/best.pt")


results = model.predict(source='0', show=True)


print(results)

This code will run your camera, and you can test your model. If it detects your face, then we can move on.

Creating a Database and Connecting

MySQLWorkbench_DOOblH6BNQ.png
MySQLWorkbench_OlN3mFHc0B.png

Let's create a MySQL database to store our users. I use MySQL Workbench to create a local database, so I can connect from python. First, we need to create a new schema, next we create a new table users with the columns you can see on the screenshot.

Now it's time to write some classes

Database.py:

from mysql import connector
import os

class Database:


    # 1. connectie openen met classe variabelen voor hergebruik
    @staticmethod
    def __open_connection():
        try:
            db = connector.connect(
                option_files=os.path.abspath(
                    os.path.join(os.path.dirname(__file__), "../config.py")
                ),
                autocommit=False,
            )
            if "AttributeError" in (str(type(db))):
                raise Exception("foutieve database parameters in config")
            cursor = db.cursor(dictionary=True, buffered=True)  # lazy loaded
            return db, cursor
        except connector.Error as err:
            if err.errno == connector.errorcode.ER_ACCESS_DENIED_ERROR:
                print("Error: Er is geen toegang tot de database")
            elif err.errno == connector.errorcode.ER_BAD_DB_ERROR:
                print("Error: De database is niet gevonden")
            else:
                print(err)
            return


    # 2. Executes READS
    @staticmethod
    def get_rows(sqlQuery, params=None):
        result = None
        db, cursor = Database.__open_connection()
        try:


            cursor.execute(sqlQuery, params)


            result = cursor.fetchall()
            cursor.close()
            if result is None:
                print(ValueError(f"Resultaten zijn onbestaand.[DB Error]"))
            db.close()
        except Exception as error:
            print(error)  # development boodschap
            result = None
        finally:
            return result


    @staticmethod
    def get_one_row(sqlQuery, params=None):
        db, cursor = Database.__open_connection()
        try:
            cursor.execute(sqlQuery, params)
            result = cursor.fetchone()
            cursor.close()
            if result is None:
                raise ValueError("Resultaten zijn onbestaand.[DB Error]")
        except Exception as error:
            print(error)  # development boodschap
            result = None
        finally:
            db.close()
            return result


    # 3. Executes INSERT, UPDATE, DELETE with PARAMETERS
    @staticmethod
    def execute_sql(sqlQuery, params=None):
        result = None
        db, cursor = Database.__open_connection()
        try:
            cursor.execute(sqlQuery, params)
            db.commit()
            # bevestigig van create (int of 0)
            result = cursor.lastrowid
            # bevestiging van update, delete (array)
            # result = result if result != 0 else params  # Extra controle doen!!
            if result != 0:  # is een insert, deze stuur het lastrowid terug.
                result = result
            else:  # is een update of een delete
                if cursor.rowcount == -1:  # Er is een fout in de SQL
                    raise Exception("Fout in SQL")
                elif (
                    cursor.rowcount == 0
                ):  # Er is niks gewijzigd, where voldoet niet of geen wijziging in de data
                    result = 0
                elif result == "undefined":  # Hoeveel rijen werden gewijzigd
                    raise Exception("SQL error")
                else:
                    result = cursor.rowcount
        except connector.Error as error:
            db.rollback()
            result = None
            print(f"Error: Data niet bewaard.{error.msg}")
        finally:
            cursor.close()
            db.close()
            return result

Here we create some methods to connect to our database and you also need to create config.py:

[connector_python]
user = root
host = 127.0.0.1
port = 3306
password = root
database = projectone


[application_config]
driver = 'SQL Server'

You need to put the name of the database, the user and the host(by default 127.0.0.1)

Let's also create DataRepository.py and create all the methods we're going to use:

from .Database import Database


class DataRepository:
    @staticmethod
    def read_users():
        sql = "SELECT * FROM users"
        return Database.get_rows(sql)


    @staticmethod
    def create_user(firstname, lastname, password, type, honorific="", recorded=0):
        sql = "INSERT into users (firstname, lastname, password, type, honorific, recorded) VALUES (%s, %s, %s, %s, %s, %s)"
        params = [firstname, lastname, password, type, honorific, recorded]
        return Database.execute_sql(sql, params)
   
    @staticmethod
    def is_recorded(user_id):
        sql = "SELECT recorded FROM users WHERE userId=%s"
        params = [user_id]
        return Database.get_rows(sql, params)
   
    @staticmethod
    def get_password(user_id):
        sql = "SELECT password FROM users WHERE userId=%s"
        params = [user_id]
        return Database.get_rows(sql, params)
   
    @staticmethod
    def set_recorded(user_id):
        sql = "UPDATE users SET recorded = 1 WHERE userId = %s"
        params = [user_id]
        return Database.execute_sql(sql, params)
   
    @staticmethod
    def get_name(user_id):
        sql = "SELECT firstname, honorific FROM users WHERE userId=%s"
        params = [user_id]
        return Database.get_rows(sql, params)

This is just a simple SQL queries that we need for our program to work, if you know some basic SQL that would be easy for you.

Making a Website

chrome_2WBbhYQ5Hv.png
chrome_66BU6edcU3.png
chrome_c54U8Q5kVS.png
chrome_LCBDOMo7aX.png
chrome_SeNxjfN29C.png
chrome_y2ZpGVqTG9.png

For this part I'm going to make a simple website to show how it works. I think it can be easy implemented to any study platform e.g. Leho.

Here is the code for app.py:

from repositories.DataRepository import DataRepository
from flask import Flask, request, jsonify, redirect,render_template, url_for, flash, session
from flask_cors import CORS
import secrets
import sys
import os
import threading
import socket
from queue import Queue
from BLE_client import run


# Creating two Queues for communication between threads.
tx_q = Queue()
rx_q = Queue()


targetDeviceName=None
targetDeviceMac="D8:3A:DD:D9:6C:7F"


sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))


app = Flask(__name__)
CORS(app)


app.secret_key = secrets.token_hex(16)

@app.route('/')
def index():
    return redirect("/signup", code=302)

# address to get all customers. (GET method)
@app.route('/users', methods=['GET'])
def users():
    # tx_q.put('   All Users    ')
    users = DataRepository.read_users()
    users.reverse()
    return render_template('users.html', users=users)

@app.route('/signup', methods=['GET'])
def signup():
    tx_q.put('  You need to       Sign up     ')
    return render_template('signup.html')


@app.route('/login/<user_id>', methods=['POST'])
def login(user_id):
    data = DataRepository.is_recorded(user_id)
    is_recorded = data[0]['recorded']


    if is_recorded == 0:
        return redirect(url_for('record_user', user_id=user_id))
    elif is_recorded == 1:
        return redirect(url_for('login_user', user_id=user_id))
    else:
        return redirect(url_for('users'))
   
@app.route('/login/<user_id>', methods=['GET'])
def login_user(user_id):
    data = DataRepository.is_recorded(user_id)
    is_recorded = data[0]['recorded']


    if is_recorded == 0:
        return redirect(url_for('record_user', user_id=user_id))
    elif is_recorded == 1:
        return render_template('login.html', user_id=user_id)
    else:
        return redirect(url_for('users'))


@app.route('/record/<user_id>', methods=['GET'])
def record_user(user_id):
    tx_q.put(f"  Please Enter   Your Password  ")


    return render_template('password.html', user_id=user_id)


is_logged_in = False
@app.route('/auth', methods=['POST'])
def auth_user():
    try:
        global is_logged_in


        tx_q.put('   Look into       the camera   ')


        data = request.get_json()  # Access the JSON data sent by the AJAX request
        user_id = data['userId']  # Extract the user ID


        response = DataRepository.get_name(user_id)


        task_thread = threading.Thread(target=run_task, args=("PREDICT", user_id))
        task_thread.start()


        task_thread.join()


        if not is_logged_in:
            print("Couldn't login")
            tx_q.put(" Couldn't login    Try again    ")
        else:
            honorific = f"{response[0]['honorific']} " if response[0]['honorific'] not in ('', '-') else ''
            print(1, honorific, 1)
            tx_q.put(f"Welcome {honorific}{response[0]['firstname']}")


        return jsonify({'message': is_logged_in})
    except Exception as e:
        return jsonify({'error': str(e)}), 500  # Return error response with status code


@app.route('/record/<user_id>', methods=['POST'])
def check_password(user_id):
    user_password = request.form['userPassword']
    data = DataRepository.get_password(user_id)


    if user_password == data[0]['password']:
        tx_q.put(f"  Let's record     Your face    ")
        return render_template('record.html', user_id=user_id)
    else:
        tx_q.put(f"   Incorrect        password    ")
        return render_template('password.html', user_id=user_id, error="Incorrect password")

def run_task(task_type, user_id):
    try:
        global is_logged_in
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect(('127.0.0.1', 65432))
        client.send(f"{task_type} {user_id}".encode('utf-8'))
        response = client.recv(1024).decode('utf-8')
       
        if task_type == "PREDICT":
            is_logged_in = response
       
        client.close()
        return is_logged_in
    except Exception as e:
        return f"Exception occurred: {e}"


   
@app.route('/recording', methods=['POST'])
def recording():
    try:
        tx_q.put(f" Please rotate  your face a bit ")


        data = request.get_json()  # Access the JSON data sent by the AJAX request
        user_id = data['userId']  # Extract the user ID


        task_thread = threading.Thread(target=run_task, args=("CAPTURE", user_id))
        task_thread.start()


        task_thread.join()


        return jsonify({'message': 'Ok'})
    except Exception as e:
        return jsonify({'error': str(e)}), 500  # Return error response with status code
   
@app.route('/split', methods=['POST'])
def split():
    try:
        tx_q.put(f"Making a dataset")


        data = request.get_json()
        class_name = data['className']
        task_thread = threading.Thread(target=run_task, args=("SPLIT", class_name))


        task_thread.start()


        task_thread.join()


        return jsonify({'message': 'Ok'})
    except Exception as e:
        return jsonify({'error': str(e)}), 500
   
@app.route('/train', methods=['GET'])
def train():
    try:
        tx_q.put(f"Training a model")


        task_thread = threading.Thread(target=run_task, args=("TRAIN", None))
        task_thread.start()


        task_thread.join()


        return jsonify({'message': 'Ok'})
    except Exception as e:
        return jsonify({'error': str(e)}), 500  # Return error response with status code
   
@app.route('/best', methods=['POST'])
def best():
    try:
        tx_q.put(f"  Choosing the     best model   ")


        data = request.get_json()  # Access the JSON data sent by the AJAX request
        user_id = data['userId']  # Extract the user ID


        task_thread = threading.Thread(target=run_task, args=("BEST", user_id))
        task_thread.start()


        task_thread.join()


        DataRepository.set_recorded(user_id)


        flash('trained', 'trained')


        tx_q.put(f"   Model has     been trained!  ")
       
        redirect_url = url_for('users')
        return jsonify({'redirect_url': redirect_url})
    except Exception as e:
        return jsonify({'error': str(e)}), 500  # Return error response with status code
   
# address to ADD a new customer (POST method for your FORM data)
@app.route('/users', methods=['POST'])
def add_user():
    # Use the data_from_form to get the data from the form
    # Use the name attributes from the <input> tags in your form as key
    honorific = request.form.get('honorific', '')
   
    the_new_id = DataRepository.create_user(request.form['userFirstName'], request.form['userLastName'], request.form['userPassword'], request.form['type'], honorific, 0)


    the_new_name = f"{request.form['userFirstName']} {request.form['userLastName']}"
    flash(the_new_name, 'name')
    flash(the_new_id, 'id')


    tx_q.put(f"User {request.form['userFirstName']} added with id {the_new_id}")
   
    return redirect(url_for('users'))


def init_ble_thread():
    # Creating a new thread for running a function 'run' with specified arguments.
    ble_client_thread = threading.Thread(target=run, args=(
        rx_q, tx_q, targetDeviceName, targetDeviceMac), daemon=True)
    # Starting the thread execution.
    ble_client_thread.start()


if __name__ == '__main__':
    init_ble_thread()


    tx_q.put('   Welcome to       FaceAuth    ')


    app.run(host='127.0.0.1', port=5000, debug=True)

This is just some basic routing and also here I connect to my Raspberry Pi via Bluetooth, I also import socket, but I'll show you why a bit later. Let's do all the pages, first page is users.html:

<!DOCTYPE html>
<html lang="en">
    <head>  <title>All Users</title>   <meta charset="UTF-8">  <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="static/css/style.min.css"><link rel="shortcut icon" href="static/img/logo/icon.svg"><meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
   <body>
      <div class="users">
         {% with messages = get_flashed_messages(with_categories=true) %}
            {% if messages %}
               {% if messages[0][0] == 'name' %}
                  <div class="users__popup">
                        <div class="users__added">
                              <div class="users__check">
                                 <span></span>
                                 <picture><source srcset="static/img/icons/check.svg" type="image/webp"><img src="static/img/icons/check.svg" alt=""></picture>
                              </div>
                              <div class="users__text">
                                 User <b>{{ messages[0][1] }}</b> has been successfully added with the id <b>{{ messages[1][1] }}</b>!
                              </div>
                        </div>
                  </div>
               {% elif messages[0][0] == 'trained' %}
                  <div class="users__popup">
                     <div class="users__added">
                           <div class="users__check">
                              <span></span>
                              <picture><source srcset="static/img/icons/check.svg" type="image/webp"><img src="static/img/icons/check.svg" alt=""></picture>
                           </div>
                           <div class="users__text">
                              Model has been trained! You can now login via your face.
                           </div>
                     </div>
               </div>
               {% endif %}
            {% endif %}
        {% endwith %}
         <div class="users__body">
            <div class="users__top">
               <a href="/signup" class="users__btn">Sign up</a>
               <div class="users__title"><h1>All Users</h1></div>
               <a href="/signup" class="users__btn">Sign up</a>
            </div>
            <ul class="users__tabs">
               <li class="users__tab _active">Student</li>
               <li class="users__tab _active">Teacher</li>
            </ul>
            <ul class="users__list">
               {% for user in users %}
                  <li class="users__item {{ user.type }}">
                     <form action="{{ url_for('login', user_id=user.userId) }}" method="POST" class="users__content">
                        <div class="users__main">
                           {% if user.honorific %}
                              <div class="users__honorific">
                                    {{ user.honorific }}
                              </div>
                           {% endif %}
                           <div class="users__name">
                              {{ user.firstname }} {{ user.lastname }}
                           </div>
                        </div>                    
                        <div class="users__info">
                           <div class="users__type">
                              {{ user.type }}
                           </div>
                           <button type="submit" class="users__login">
                              {{ 'Login' if user.recorded else 'Record' }}
                           </button>
                        </div>
                     </form>
                  </li>
               {% endfor %}
            </ul>
            <div class="users__empty {% if not users %}_show{% endif %}">
               <p>No users found</p>
            </div>
         </div>
      </div>
      <script src="static/js/vendors.min.js"></script>
      <script src="static/js/app.min.js"></script>
   </body>
</html>

Here I want to display all users and also at the top I have some flash popups that I want to display after some actions.

Next page is signup.html, so the user can easily sign up:

<!DOCTYPE html>
<html lang="en">
    <head>  <title>Sign Up</title>  <meta charset="UTF-8">  <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="static/css/style.min.css">  <!-- <meta name="theme-color" content="#fff"> -->  <!-- <meta name="theme-color" content="#F2709C"/>   -->  <link rel="shortcut icon" href="static/img/logo/icon.svg">  <!-- <meta name="robots" content="noindex, nofollow"> -->   <!-- <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=0"> -->  <meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
   <body>
      <div class="form">
         <div class="form__body">
            <div class="form__title"><h1>FaceAuth</h1></div>
            <form action="/users" class="form-main" method="post">
               <div class="form-main__input">
                  <input required class="input-anim" type="text" autocomplete="given-name" placeholder="First Name" name="userFirstName" id="userFirstName">
                  <label for="userFirstName"></label>
               </div>
               <div class="form-main__input">
                  <input required class="input-anim" type="text" autocomplete="family-name" placeholder="Last Name" name="userLastName" id="userLastName">
                  <label for="userLastName"></label>
               </div>
               <div class="form-main__input form-main__input_password">
                  <input required class="input-anim" type="password" placeholder="Password" name="userPassword" id="userPassword">
                  <label for="userPassword"></label>
                  <div class="form-main__eye">
                     <svg>
                        <use xlink:href='static/img/sprite.svg#eye'></use>
                     </svg>
                     <svg>
                        <use xlink:href='static/img/sprite.svg#eye-slash'></use>
                     </svg>
                  </div>
               </div>
               <div class="form-main__select">
                  <div class="select" data-name="type" data-state tabindex="0">
                     <div class="select__title" data-default="Who are you?"><span>Who are you?</span></div>
                     <div class="select__content">
                        <div class="select__body">
                           <input type="radio" class="select__input" id="student" value="Student" name="type">
                           <label class="select__label" for="student"><span>I'm a Student</span></label>
                           <span class="select__line"></span>
                           <input type="radio" class="select__input" id="teacher" value="Teacher" name="type">
                           <label class="select__label" for="teacher"><span>I'm a Teacher</span></label>
                        </div>
                     </div>
                  </div>
               </div>
               <div class="form-main__select">
                  <div class="select" data-name="honorific" data-state tabindex="0">
                     <div class="select__title" data-default="Select your title"><span>Select your title</span></div>
                     <div class="select__content">
                        <div class="select__body">
                           <input type="radio" class="select__input" id="mr" value="Mr" name="honorific">
                           <label class="select__label" for="mr"><span>Mr.</span></label>
                           <span class="select__line"></span>
                           <input type="radio" class="select__input" id="ms" value="Ms" name="honorific">
                           <label class="select__label" for="ms"><span>Ms.</span></label>
                           <span class="select__line"></span>
                           <input type="radio" class="select__input" id="notSpecify" value="-" name="honorific">
                           <label class="select__label" for="notSpecify"><span>Prefer to not specify</span></label>
                        </div>
                     </div>
                  </div>
               </div>
               <div class="form-main__bottom">
                  <button type="submit" class="form-main__btn btn">Sign up</button>
               </div>
            </form>
         </div>
      </div>
      <script src="static/js/vendors.min.js"></script>
      <script src="static/js/app.min.js"></script>
   </body>
</html>

Here you can type your first name, second name, password and choose if you're a student or teacher, in case if you're a teacher you can choose your honorific: Mr, Ms or Prefer to not specify(-)

Next one is password.html, it uses to ask user's password before recording a face:

<!DOCTYPE html>
<html lang="en">
    <head>  <title>Enter your password</title>  <meta charset="UTF-8">  <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="/static/css/style.min.css"><link rel="shortcut icon" href="/static/img/logo/icon.svg"><meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
   <body>
      <div class="form">
         <div class="form__body form__body_password">
            <div class="form__title form__title_small"><h1>Enter your password</h1></div>
            <form action="{{ url_for('check_password', user_id=user_id) }}" method="POST" class="form-main">
               <div class="form-main__input form-main__input_password">
                  <input required class="input-anim" type="password" placeholder="Password" name="userPassword" id="userPassword">
                  <label for="userPassword"></label>
                  <div class="form-main__eye">
                     <svg>
                        <use xlink:href='/static/img/sprite.svg#eye'></use>
                     </svg>
                     <svg>
                        <use xlink:href='/static/img/sprite.svg#eye-slash'></use>
                     </svg>
                  </div>
               </div>
               {% if error %}
               <div class="form-main__error">
                  {{ error }}
               </div>
               {% endif %}
               <div class="form-main__bottom form-main__bottom_password">
                  <button type="submit" class="form-main__btn form-main__btn_big btn">Next</button>
               </div>
            </form>
         </div>
      </div>
      <script src="/static/js/vendors.min.js"></script>
      <script src="/static/js/app.min.js"></script>
   </body>
</html>

And now of course record.html, where you can start recording your face and track all the progress:

<!DOCTYPE html>
<html lang="en">
    <head>  <title>Recording</title>   <meta charset="UTF-8">  <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="/static/css/style.min.css"><link rel="shortcut icon" href="/static/img/logo/icon.svg"><meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
   <body>
      <div class="record">
         <div class="record__body">
            <div class="record__title"><h1>Let's record your face</h1></div>
            <div class="record__main">
               <div class="record__icon">
                  <img src="/static/img/icons/face.svg" alt="">
                  <img class="_hide" src="/static/img/icons/rotate.svg" alt="">
               </div>
               <div class="record__bottom">
                  <button data-id='{{ user_id }}' class="record__btn">
                     Start recording
                  </button>
               </div>
            </div>
         </div>
      </div>
      <div class="progress _hide">
         <div class="progress__body">
            <div class="progress__title"><h1>Almost there!</h1></div>
            <ul class="progress__steps">
               <li class="progress__step _progress">
                  <div class="progress__icon">
                     <svg>
                        <use xlink:href='/static/img/sprite.svg#folder'></use>
                     </svg>
                  </div>
                  <p class="progress__descr">Making a dataset</p>
               </li>
               <li class="progress__step">
                  <div class="progress__icon">
                     <svg>
                        <use xlink:href='/static/img/sprite.svg#study'></use>
                     </svg>
                  </div>
                  <p class="progress__descr">Training a model</p>
               </li>
               <li class="progress__step">
                  <div class="progress__icon">
                     <svg>
                        <use xlink:href='/static/img/sprite.svg#star'></use>
                     </svg>
                  </div>
                  <p class="progress__descr">Choosing the best model</p>
               </li>
            </ul>
         </div>
      </div>
      <script src="/static/js/vendors.min.js"></script>
      <script src="/static/js/app.min.js"></script>
   </body>
</html>

And finally, last but not least login.html:

<!DOCTYPE html>
<html lang="en">
    <head>  <title>Auth</title>  <meta charset="UTF-8">  <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="/static/css/style.min.css"> <!-- <meta name="theme-color" content="#fff"> -->  <!-- <meta name="theme-color" content="#F2709C"/>   -->  <link rel="shortcut icon" href="/static/img/logo/icon.svg"> <!-- <meta name="robots" content="noindex, nofollow"> -->   <!-- <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=0"> -->  <meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
   <body>
      <div class="auth" data-id="{{ user_id }}">
         <div class="auth__body">
            <div class="auth__title auth__title_lock">
               Look into<br> the camera
            </div>
            <div class="auth__title auth__title_error">
               Oops... Try again
            </div>
            <div class="auth__title auth__title_open">
               Welcome!
            </div>
            <div class="auth__icon">
               <picture><source srcset="/static/img/icons/lock.svg" type="image/webp"><img src="/static/img/icons/lock.svg" alt=""></picture>
            </div>
            <div class="auth__open">
               <div class="auth__key">
                  <picture><source srcset="/static/img/icons/key.svg" type="image/webp"><img src="/static/img/icons/key.svg" alt=""></picture>
               </div>
            </div>
         </div>
      </div>
      <script src="/static/js/vendors.min.js"></script>
      <script src="/static/js/app.min.js"></script>
   </body>
</html>

This page should appear when you want to login as a certain user.

You can download the static folder(images, icons, css, js, fonts) form my GitHub repository, so you don't need to code it yourself :)

Socket Server

Now that we've finished the frontend part, let's talk about the backend and why we import socket. As you know, if we run a process within a function, we can't call another function while our process is still running. In other words, our Flask app won't respond when we run a detection process, for instance, so we have to wait before doing anything else. By running a socket server, we can delegate the process to run independently and send the response once it's finished. This allows our website to continue accepting new connections and responding to other requests while the process is running.

Here is my socket_server

app.py:

import socket
import threading
from detection import capture_images_with_yolo
from split import split_dataset_by_class
from train_detection import train_detection
from best import get_the_best_model
from predict import predict_user
import os


CONCURRENT_CONNECTIONS = 5
semaphore = threading.Semaphore(CONCURRENT_CONNECTIONS)
lock = threading.Lock()
task_running = False
server = None


def handle_client(client_socket):
    global task_running
    with semaphore:
        try:
            with lock:
                if not task_running:
                    task_running = True
                else:
                    response = "Task is already running. Please try again later."
                    client_socket.send(response.encode('utf-8'))
                    return


            task_info = client_socket.recv(1024).decode('utf-8').split(' ')
            task_type = task_info[0]
            user_id = task_info[1]


            if task_type == "CAPTURE":
                response = capture_images_with_yolo(user_id)
            elif task_type == "SPLIT":
                class_name = user_id  # Assuming user_id is used as class_name for splitting
                split_dataset_by_class(class_name)
                response = "Dataset split finished."
            elif task_type == "TRAIN":
                response = train_detection()
            elif task_type == "BEST":
                get_the_best_model(user_id)
                response = "The best model found"
            elif task_type == "PREDICT":
                response = predict_user(user_id)
            else:
                response = "Unknown task type."


            client_socket.send(response.encode('utf-8'))
        except Exception as e:
            response = f"Exception in handling client: {e}"
            client_socket.send(response.encode('utf-8'))
        finally:
            client_socket.close()
            with lock:
                task_running = False


def start_server(host='127.0.0.1', port=65432):
    global server
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((host, port))
    server.listen(5)
    print(f"Server listening on {host}:{port}")


    while True:
        client_socket, addr = server.accept()
        print(f"Accepted connection from {addr}")


        client_handler = threading.Thread(target=handle_client, args=(client_socket,))
        client_handler.start()


if __name__ == "__main__":
    try:
        start_server()
    except KeyboardInterrupt:
        print("Stopping the server...")
        if server:
            server.close()
        print("Server stopped.")

Here I have some tasks, let's talk about some of them

detection.py:

import cv2
import os
import numpy as np
from ultralytics import YOLO


def resize_and_pad(image, target_size):
    # Convert image to grayscale
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)


    h, w = gray_image.shape[:2]
    scale = target_size / max(h, w)
    new_w, new_h = int(w * scale), int(h * scale)
    resized_image = cv2.resize(gray_image, (new_w, new_h))


    top = bottom = (target_size - new_h) // 2
    left = right = (target_size - new_w) // 2


    padded_image = cv2.copyMakeBorder(
        resized_image, top, bottom, left, right,
        borderType=cv2.BORDER_CONSTANT, value=0
    )


    return padded_image


def apply_augmentation(image):
    # Randomly adjust brightness, contrast, saturation
    alpha = 1.0 + np.random.uniform(-0.5, 0.5)  # Random brightness factor
    beta = 0.5 + np.random.uniform(-0.25, 0.25)  # Random contrast factor
    saturation = 0.5 + np.random.uniform(-0.25, 0.25)  # Random saturation factor


    image = cv2.convertScaleAbs(image, alpha=alpha, beta=beta)
    hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    hsv_image[:, :, 1] = np.clip(hsv_image[:, :, 1] * saturation, 0, 255).astype(np.uint8)
    image = cv2.cvtColor(hsv_image, cv2.COLOR_HSV2BGR)


    # Randomly rotate image
    angle = np.random.uniform(-15, 15)
    rows, cols = image.shape[:2]
    M = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
    image = cv2.warpAffine(image, M, (cols, rows))


    return image


def capture_images_with_yolo(user_id):
    response = ""
    try:
        model = YOLO(r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\models\detect\train5\weights\best.pt")
        output_dir = f'AI/dataset/{user_id}'
        os.makedirs(output_dir, exist_ok=True)


        # Attempt to open the video capture device
        cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)


        if not cap.isOpened():
            raise Exception("Error: Could not open webcam.")
       
        # Adjust camera settings
        cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25)  # Disable auto exposure (value may vary)
        cap.set(cv2.CAP_PROP_BRIGHTNESS, 0.5)  # Adjust brightness (0.0 to 1.0)
        cap.set(cv2.CAP_PROP_CONTRAST, 0.5)    # Adjust contrast (0.0 to 1.0)
        cap.set(cv2.CAP_PROP_EXPOSURE, -4)     # Adjust exposure (varies by camera)


        fps = 30
        frames_to_capture = 10


        frame_count = 0
        original_image_count = 0
        augmented_image_count = 0


        while original_image_count < 100:
            if cap is not None:
                ret, frame = cap.read()


                if not ret:
                    response = "Error: Could not read frame."
                    break


                frame_count += 1


                if frame_count % (fps // frames_to_capture) == 0:
                    results_list = model(frame)
                    for results in results_list:
                        if results.boxes is not None and len(results.boxes) > 0:
                            for box in results.boxes.xyxy:
                                x1, y1, x2, y2 = box.cpu().numpy().astype(int)
                                cropped_image = frame[y1:y2, x1:x2]
                               
                                # Apply augmentation and save augmented images
                                for _ in range(3):  # Create 3 augmented versions per original image
                                    augmented_image = apply_augmentation(cropped_image)
                                    resized_image = resize_and_pad(augmented_image, 320)
                                    augmented_image_path = os.path.join(output_dir, f'{augmented_image_count}.jpg')
                                    cv2.imwrite(augmented_image_path, resized_image)
                                    augmented_image_count += 1


                                # Save original image
                                resized_image = resize_and_pad(cropped_image, 320)
                                original_image_path = os.path.join(output_dir, f'{original_image_count}.jpg')
                                cv2.imwrite(original_image_path, resized_image)
                                original_image_count += 1


                                if original_image_count >= 100:
                                    break


        response = "Ok"


    except Exception as e:
        response = f"Exception occurred: {e}"


    finally:
        if cap is not None and cap.isOpened():
            cap.release()
        cv2.destroyAllWindows()


    return response

Here, we're using the webcam to capture pictures. The resize_and_pad function ensures our image is 320x320 pixels. If the image is not square, it fills the background with black. And also we're applying grayscale, this allows our model to focus on shapes but not color. The apply_augmentation function applies augmentation to the images, creating 300 variations for each user.

split.py:

import os
import shutil
import random


def split_dataset_by_class(class_name, train_ratio=0.7, val_ratio=0.2, test_ratio=0.1):
    dataset_path = r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\facerecognition-2"
    dataset_images = r'D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\dataset'
   
    # Create train, val, and test directories if they don't exist
    for split in ['train', 'val', 'test']:
        split_path = os.path.join(dataset_path, split, 'user')
        if not os.path.exists(split_path):
            os.makedirs(split_path)
        else:
            # Clear the directory if it already exists
            for filename in os.listdir(split_path):
                file_path = os.path.join(split_path, filename)
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.unlink(file_path)
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
   
    # List all images in the class folder
    class_images = [img for img in os.listdir(os.path.join(dataset_images, class_name)) if img.endswith(('.jpg', '.jpeg', '.png'))]
    random.shuffle(class_images)
   
    # Calculate the number of images for each split
    num_images = len(class_images)
    num_train = int(train_ratio * num_images)
    num_val = int(val_ratio * num_images)
    num_test = num_images - num_train - num_val
   
    # Move images to train, val, and test directories
    for i, img in enumerate(class_images):
        img_path = os.path.join(dataset_images, class_name, img)
        if i < num_train:
            dest_folder = 'train'
        elif i < num_train + num_val:
            dest_folder = 'val'
        else:
            dest_folder = 'test'
       
        dest_path = os.path.join(dataset_path, dest_folder, 'user', img)
        shutil.copy(img_path, dest_path)
   
    print(f"Split user {class_name} images into train ({num_train}), val ({num_val}), and test ({num_test}) sets.")

This function is used to split our 300 images into train, test and val sets, so we can start train our classification model.


Making a Face Recognition Model

chrome_CM0807y6yS.png

First, let's create a new Roboflow project and call it 'facerecognition'. I initially set the Annotation Group to 'unknown', but we will create a group 'user' for each individual user. This approach allows us to generate a model for each user in our program. Although we could create a single model with all users, it would take too long to train and result in poor accuracy. Let's just create some dataset, I put around 1,000 images and classified them as 'unknown', now let's export our dataset and paste into our project as we did before. Now we have a new folder 'facerecognition' with our dataset, let's train our model

train_detection.py:

from ultralytics import YOLO


def train_detection():
    response = ""
    try:
        # Load the YOLO model
        model = YOLO(model="yolov8s-cls.pt")

        # Set the device to GPU
        device = 'cuda'
       
        # Train the model on the GPU
        model.train(
            data="D:\\Downloads\\Howest\\Semester 2\\Project_one\\2023-2024-projectone-ctai-NikitosKokos\\AI\\facerecognition-2",
            epochs=10,
            imgsz=320,
            # verbose=True,
            verbose=False,
            batch=8,
            device=device
        )
       
        # Validate the model
        model.val()
       
        # Export the model
        model.export()
       
        response = "Training completed successfully."
    except Exception as e:
        response = f"Exception occurred: {e}"
   
    return response

Here we just train our classification model and now we have to choose the best model and put it in models/classify/user-id/best.pt

best.py:

import os
import shutil


def get_the_best_model(user_id):
    # Find the folder with the highest number containing weights/best.pt
    classify_runs_dir = r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\runs\classify"
    highest_num = -1
    highest_num_dir = None


    for folder_name in os.listdir(classify_runs_dir):
        if folder_name.startswith("train") and folder_name[5:].isdigit():
            folder_num = int(folder_name[5:])
            weights_dir = os.path.join(classify_runs_dir, folder_name, "weights")
            best_pt_path = os.path.join(weights_dir, "best.pt")


            if os.path.exists(best_pt_path) and folder_num > highest_num:
                highest_num = folder_num
                highest_num_dir = folder_name


    if highest_num_dir:
        best_pt_path = os.path.join(classify_runs_dir, highest_num_dir, "weights", "best.pt")
        destination_dir = fr"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\models\classify\{user_id}"
        os.makedirs(destination_dir, exist_ok=True)
        destination_path = os.path.join(destination_dir, "best.pt")
        shutil.copyfile(best_pt_path, destination_path)
        print(f"Copied {best_pt_path} to {destination_path}")
    else:
        print("No valid training directories found with weights/best.pt.")

We're looking for the folder with the highest number that contains weights/best.pt and copy it to models/classify/user-id/best.pt

Time to make a prediction, let's create predict.py:

import cv2
import time
from ultralytics import YOLO
import os


def resize_and_pad(image, target_size):
    # Convert image to grayscale
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)


    h, w = gray_image.shape[:2]
    scale = target_size / max(h, w)
    new_w, new_h = int(w * scale), int(h * scale)
    resized_image = cv2.resize(gray_image, (new_w, new_h))


    top = bottom = (target_size - new_h) // 2
    left = right = (target_size - new_w) // 2


    padded_image = cv2.copyMakeBorder(
        resized_image, top, bottom, left, right,
        borderType=cv2.BORDER_CONSTANT, value=0
    )


    return padded_image


def predict_user(user_id):
    response = ''
    try:
        # Load the YOLO models
        detection_model = YOLO(r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\models\detect\train5\weights\best.pt")
        # Load the YOLO classification model for the specific user
        classification_model_path = fr"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\models\classify\{user_id}\best.pt"
        classification_model = YOLO(classification_model_path)


        # Attempt to open the video capture device
        cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)


        if not cap.isOpened():
            raise Exception("Error: Could not open webcam.")
       
        # Adjust camera settings
        cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25)  # Disable auto exposure (value may vary)
        cap.set(cv2.CAP_PROP_BRIGHTNESS, 0.5)  # Adjust brightness (0.0 to 1.0)
        cap.set(cv2.CAP_PROP_CONTRAST, 0.5)    # Adjust contrast (0.0 to 1.0)
        cap.set(cv2.CAP_PROP_EXPOSURE, -4)     # Adjust exposure (varies by camera, may need tuning)


        # Initialize variables for capturing frames and timing
        start_time = time.time()
        duration = 5  # capture for 5 seconds
        total_frames = 0


        # Directory to save high-confidence images
        save_dir = r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\dataset\login"
        os.makedirs(save_dir, exist_ok=True)


        while (time.time() - start_time < duration) and not response:
            ret, frame = cap.read()


            if not ret:
                response = "Error: Could not read frame."
                break


            total_frames += 1


            # Use detection model to detect objects in the frame
            results_list = detection_model(frame)
            for results in results_list:
                if results.boxes is not None and len(results.boxes) > 0:
                    for box in results.boxes.xyxy:
                        x1, y1, x2, y2 = box.cpu().numpy().astype(int)
                        cropped_image = frame[y1:y2, x1:x2]
                       
                        # Convert cropped image to grayscale
                        gray_cropped_image = resize_and_pad(cropped_image, 320)


                        # Use classification model to classify the grayscale cropped image
                        classification_results = classification_model(gray_cropped_image)


                        if classification_results and len(classification_results) > 0:
                            classification_result = classification_results[0]
                            if classification_result.probs is not None:
                                class_probs = classification_result.probs.cpu().numpy()


                                # Access the top1 attribute for the predicted class
                                predicted_class = classification_result.probs.top1
                                if classification_result.names[predicted_class] == 'user':
                                    if classification_result.probs.top1conf > 0.9:
                                        response = 'Ok'


        print('Time elapsed:', round((time.time() - start_time), 2))


    except Exception as e:
        response = f"Exception occurred: {e}"


    finally:
        if cap.isOpened():
            cap.release()
        cv2.destroyAllWindows()


    return response

Here we have a similar function resize_and_pad to convert image to grayscale and make it 320x320 pixels. In predict_user we run our detection model to find the face and then run classification model to classify if this face belongs to a certain user or unknown.

Making a Box for Raspberry Pi

photo_2024-06-17_00-32-04.jpg
photo_2024-06-16_23-53-31.jpg
photo_2024-06-16_23-54-14.jpg
box_05mm.png

For the assembly part, you'll need a 5mm thick multiplex wood sheet measuring 600x450mm for laser cutting. First, create a file in Illustrator that instructs the laser cutting machine on how to cut the wood plate. I'll show you my version of illustrator file, so you could use it.

Next, assemble the pieces by gluing them together, leaving the top piece unglued to allow access to the Raspberry Pi. Position your LCD display into its designated hole and secure it with double-sided tape to prevent it from falling.

Downloads

Connect to Raspberry Pi and Deploy

photo_2024-06-17_00-51-48.jpg
Mykyta Tsykunov FaceAuth - Project One

Here's how we connect our Flask app to a Raspberry Pi. First, ensure you replace targetDeviceMac with the MAC address of your own Raspberry Pi:

# Creating two Queues for communication between threads.
tx_q = Queue()
rx_q = Queue()


targetDeviceName=None
targetDeviceMac="D8:3A:DD:D9:6C:7F"

def init_ble_thread():
    # Creating a new thread for running a function 'run' with specified arguments.
    ble_client_thread = threading.Thread(target=run, args=(
        rx_q, tx_q, targetDeviceName, targetDeviceMac), daemon=True)
    # Starting the thread execution.
    ble_client_thread.start()


if __name__ == '__main__':
    init_ble_thread()


    tx_q.put('   Welcome to       FaceAuth    ')


    app.run(host='127.0.0.1', port=5000, debug=True)

Now, let's configure the Raspberry Pi to establish a connection:

import threading
import queue
from LCD import LCD


# Bluez gatt uart service (SERVER)
from bluetooth_uart_server.bluetooth_uart_server import ble_gatt_uart_loop


def main(lcd: LCD):
    # Initialise display
    lcd.init()


    i = 0
    rx_q = queue.Queue()
    tx_q = queue.Queue()
    device_name = "mykyta-rapsi" # TODO: replace with your own (unique) device name
    threading.Thread(target=ble_gatt_uart_loop, args=(rx_q, tx_q, device_name), daemon=True).start()


    def get_spaces(string):
      spaces = ' ' * len(string)
         
      return spaces
   
    def split_string(input_string):
        # Ensure the string is at most 32 characters long
        trimmed_string = input_string[:32]
       
        # Get the first 16 symbols
        first_part = trimmed_string[:16]
       
        # Get the second 16 symbols
        second_part = trimmed_string[16:32]
       
        return first_part, second_part


    def lcd_print(string):
        if len(string) > 32:
            print(f'String {string} is too big: {len(string)}')


        if len(string) <= 16:
            lcd.send_string(f"{string}{get_spaces(string)}",1)
            lcd.send_string(" " * 16,2)
        else:
            first_part, second_part = split_string(string)
            lcd.send_string(first_part,1)
            lcd.send_string(f'{second_part}{get_spaces(second_part)}',2)


    while True:
        try:
            incoming = rx_q.get(timeout=1) # Wait for up to 1 second
            if incoming:
                print("In main loop: ({})".format(incoming))
                lcd_print(incoming)
        except Exception as e:
            pass # nothing in Q
       
if __name__ == '__main__':
    lcd = LCD()
    try:
      main(lcd)
    except KeyboardInterrupt:
        pass
    finally:
        lcd.send_instruction(0x01) # Clear display & cursor home

Here I have a class LCD, I use send_string to print something on lcd display when we have something in incoming

My LCD class:

from time import sleep
from RPi import GPIO
import smbus


GPIO.setmode(GPIO.BCM)


i2c = smbus.SMBus(1)


class LCD:
   def __init__(self, i2c_addr = 0x27, lcd_width = 16, lcd_chr = 1, lcd_cmd = 0, lcd_lines = [0x80, 0xC0], lcd_backlight = 0b0000_1000, enable = 0b0000_0100, e_pulse = 0.0002, e_delay = 0.0002) -> None:
      self.__i2c_addr = i2c_addr
      self.__lcd_width = lcd_width
      self.__lcd_chr = lcd_chr
      self.__lcd_cmd = lcd_cmd
      self.__lcd_lines = lcd_lines
      self.__lcd_backlight = lcd_backlight
      self.__enable = enable
      self.__e_pulse = e_pulse
      self.__e_delay = e_delay


   @property
   def lcd_width(self):
      return self.__lcd_width


   def init(self):
      # spamming
      self.send_byte_with_e_toggle(0b0011_0000)
      self.send_byte_with_e_toggle(0b0011_0000)


      self.send_byte_with_e_toggle(0b0010_0000) # put into 4bit-mode


      self.send_instruction(0x28) # 0010_1000 Data length, number of lines, font size
      self.send_instruction(0x06) # 000110 Cursor move direction
      self.send_instruction(0x0C) # 0000_1100 Display On, Cursor Off, Blink Off


   def send_instruction(self, byte):
      self.set_data_bits(byte, self.__lcd_cmd)
      sleep(0.001)


   def send_character(self, byte):
      self.set_data_bits(byte, self.__lcd_chr)
   
   def set_data_bits(self, value, mode):
      MSNibble = value & 0xf0
      LSNibble = (value & 0x0f) << 4


      MSNibble_byte = MSNibble | self.__lcd_backlight | mode
      LSNibble_byte = LSNibble | self.__lcd_backlight | mode
   
      sleep(self.__e_delay)
      i2c.write_byte(self.__i2c_addr, MSNibble_byte | self.__enable) # send MSNibble with E bit high, BT bit high and RS bit according to # “mode” and little delay
      sleep(self.__e_pulse)
      i2c.write_byte(self.__i2c_addr, MSNibble_byte & ~self.__enable) # send MSNibble with E bit low, BT bit high and RS bit according to # “mode” and little delay
      sleep(self.__e_delay)
      i2c.write_byte(self.__i2c_addr, LSNibble_byte | self.__enable) # send LSNibble with E bit high, BT bit high and RS bit according to # “mode” and little delay
      sleep(self.__e_pulse)
      i2c.write_byte(self.__i2c_addr, LSNibble_byte & ~self.__enable) # send LSNibble with E bit low, BT bit high and RS bit according to # “mode” and little delay
      sleep(self.__e_delay)


   def clear(self):
      self.send_instruction(0x01) # 0000_0001 Clear display


   def send_byte_with_e_toggle(self, bits):
      # Toggle enable
      sleep(self.__e_delay)
      # write data to i2c with E bit HIGH
      # OR operator will turn all bits to 0 except our e bit that we put high
      bits_high = bits | self.__enable
      i2c.write_byte(self.__i2c_addr, bits_high)
      sleep(self.__e_pulse)
      # write data to i2c with E bit LOW
      # AND operator will turn all bits to 1 except our e bit that we put low
      # ~ bitwise negation
      bits_low = bits & ~self.__enable
      i2c.write_byte(self.__i2c_addr, bits_low)
      sleep(self.__e_delay)


   def send_string(self, message, line):
      # by default we print on line 1
      instruction = self.__lcd_lines[0]


      if line > 0 and line < 3:
         instruction = self.__lcd_lines[line-1]


      if len(message) != self.__lcd_width:
         print(f'The message ({message}) length is {len(message)} not {self.__lcd_width}!')

      self.send_instruction(instruction)


      for char in message:
         # get decimal of each symbol
         self.send_character(ord(char))

The code for the bluetooth_uart_server can be found on my GitHub repository.

And finally let's deploy our code, so it runs automatically on your Raspberry Pi when it boots up:

First, create a new file named myproject.service

Place the Following Code in the File:

[Unit]
Description=ProjectOne Project
After=network.target


[Service]
ExecStart=/home/user/<name_of_your_repo>/<venv>/bin/python -u /home/user/<name_of_your_repo>/RPi/app.py
WorkingDirectory=/home/user/<name_of_your_repo>/RPi
StandardOutput=inherit
StandardError=inherit
Restart=always
User=user
CPUSchedulingPolicy=rr
CPUSchedulingPriority=99


[Install]
WantedBy=multi-user.target

Run this command to copy this file to /etc/systemd/system

sudo cp myproject.service /etc/systemd/system/myproject.service

Enable the Script to Start Automatically After Booting:

sudo systemctl enable myproject.service

That's it! We're finished our project :)

If you have any questions, I'm glad to answer them. I hope this encourages someone to make this project. Good luck!