FaceAuth: Easy Login to Study Platforms Using Facial Authentication
by Mykyta Tsykunov in Circuits > Raspberry Pi
178 Views, 6 Favorites, 0 Comments
FaceAuth: Easy Login to Study Platforms Using Facial Authentication

Imagine you study at a university or school and use a public computer to access your study materials. Each time, you need to type your password to log in. Or imagine your teacher wants to share a solution from their account, requiring them to enter sensitive data on your computer. To make this process easier and more secure, I came up with the idea of a facial authentication AI model. This model can be implemented on any study platform, providing quick access to your study materials on any device.
Easy logging is the key to making your study life simpler and more enjoyable!
The idea is to create a device with a Raspberry Pi inside, an LCD display as the output, and a USB camera connected to a laptop as the input. You can connect to the Raspberry Pi via Bluetooth and use it to log in to a study platform. Additionally, I plan to create a website to demonstrate the working model as if it were a real study platform. For this I'm going to use Flask, because you don't need a lot of knowledge to build a simple website using this python framework.
Supplies




Prepare Your Data


First thing we need is to prepare our data, you can find some datasets of faces and use them, for this you can use the website called kaggle.com, I'll share links to the datasets I find, but you can use your own as well. I used dataset 70,000 Real Faces 6 but I took only 5k for the annotation. You can also check these datasets: Human Faces(7k), Flickr-Faces-HQ Dataset (Nvidia) - Part 5(8k) and 70,000 Real Faces 3
Now as we have our data, it's time to upload it on Roboflow, you can create an account for free, but I would reccomend to choose a Starter Plan for 14 days, you don't need to pay for it, but you'll get 10,000 Auto Label credits that you can use for labeling your faces, instead of manually labeling all of them.
Annotation of Your Data









Let's create a new roboflow project, I called it facedetect. I chose face for the Annotation Group, and it's really important to choose Object Detection for this one, because we need to first detect the user's face before implementing the Object Classification model.
Our next step is to upload our data. You can choose to upload several files or an entire folder, I chose the second one. Once all the images are uploaded, let's start annotation. We can do this manually or use the Auto Label feature (remember, the Starter Plan allows us to auto-label up to 10,000 images). I used Auto Label for most of my data and manually annotated some images. In total, I ended up with a dataset containing 5,204 images, but we're not done yet.
Next step is to click on Generate and perform the Augmentation. In simple terms, augmentation creates more data from our dataset by applying transformations like shaping, rotating, and changing brightness. I'll show you my settings, but feel free to add more if you want. It's better to keep it simple, as understanding what enhances our model versus what might reduce accuracy is important. So the total number of images in my dataset was almost 20k.
Now, we can finally click Create and after creating a version of our dataset, click Export dataset, and choose YOLOv8, show download code and click Continue. You'll see a code you need to copy and paste into your project.
Training a Face Detection Model
To train our face detection model, we will use Ultralytics. If you haven't installed it, please do so. Keep in mind that different versions might have slightly different syntax. I am using Ultralytics 8.2.26, so you can install this version to avoid some errors.
First, copy the code that you get when you export your dataset and run it. You'll see a new folder with the name of your project. This is our dataset. Now let's train our model. I'm going to use CUDA, which allows you to train the model using the GPU instead of the CPU, making the process faster. If you don't have CUDA, you need to install PyTorch. I am using PyTorch 2.0.1+cu117.
Here is the code to train the model:
from ultralytics import YOLO
def main():
# Load the YOLO model
model = YOLO(model="yolov8s.pt")
# model = YOLO(model="yolov8n.pt")
# Set the device to GPU
device = 'cuda'
# Train the model on the GPU
model.train(
data="path_to_your_project\\facedetect-1\\data.yaml",
epochs=20,
imgsz=(640, 640),
verbose=True,
batch=8,
device=device
)
# Validate the model
model.val()
# Export the model
model.export()
if __name__ == '__main__':
main()
After training, you can check the runs/detect directory and see several training runs. Choose the one that contains a folder named weights and find a file called best.pt. This is our final model. For me, the best model is located in /runs/detect/train5/weights/best.pt. Now, let's test it:
from ultralytics import YOLO
# Load your trained model
model = YOLO("./runs/detect/train5/weights/best.pt")
results = model.predict(source='0', show=True)
print(results)
This code will run your camera, and you can test your model. If it detects your face, then we can move on.
Creating a Database and Connecting


Let's create a MySQL database to store our users. I use MySQL Workbench to create a local database, so I can connect from python. First, we need to create a new schema, next we create a new table users with the columns you can see on the screenshot.
Now it's time to write some classes
Database.py:
from mysql import connector
import os
class Database:
# 1. connectie openen met classe variabelen voor hergebruik
@staticmethod
def __open_connection():
try:
db = connector.connect(
option_files=os.path.abspath(
os.path.join(os.path.dirname(__file__), "../config.py")
),
autocommit=False,
)
if "AttributeError" in (str(type(db))):
raise Exception("foutieve database parameters in config")
cursor = db.cursor(dictionary=True, buffered=True) # lazy loaded
return db, cursor
except connector.Error as err:
if err.errno == connector.errorcode.ER_ACCESS_DENIED_ERROR:
print("Error: Er is geen toegang tot de database")
elif err.errno == connector.errorcode.ER_BAD_DB_ERROR:
print("Error: De database is niet gevonden")
else:
print(err)
return
# 2. Executes READS
@staticmethod
def get_rows(sqlQuery, params=None):
result = None
db, cursor = Database.__open_connection()
try:
cursor.execute(sqlQuery, params)
result = cursor.fetchall()
cursor.close()
if result is None:
print(ValueError(f"Resultaten zijn onbestaand.[DB Error]"))
db.close()
except Exception as error:
print(error) # development boodschap
result = None
finally:
return result
@staticmethod
def get_one_row(sqlQuery, params=None):
db, cursor = Database.__open_connection()
try:
cursor.execute(sqlQuery, params)
result = cursor.fetchone()
cursor.close()
if result is None:
raise ValueError("Resultaten zijn onbestaand.[DB Error]")
except Exception as error:
print(error) # development boodschap
result = None
finally:
db.close()
return result
# 3. Executes INSERT, UPDATE, DELETE with PARAMETERS
@staticmethod
def execute_sql(sqlQuery, params=None):
result = None
db, cursor = Database.__open_connection()
try:
cursor.execute(sqlQuery, params)
db.commit()
# bevestigig van create (int of 0)
result = cursor.lastrowid
# bevestiging van update, delete (array)
# result = result if result != 0 else params # Extra controle doen!!
if result != 0: # is een insert, deze stuur het lastrowid terug.
result = result
else: # is een update of een delete
if cursor.rowcount == -1: # Er is een fout in de SQL
raise Exception("Fout in SQL")
elif (
cursor.rowcount == 0
): # Er is niks gewijzigd, where voldoet niet of geen wijziging in de data
result = 0
elif result == "undefined": # Hoeveel rijen werden gewijzigd
raise Exception("SQL error")
else:
result = cursor.rowcount
except connector.Error as error:
db.rollback()
result = None
print(f"Error: Data niet bewaard.{error.msg}")
finally:
cursor.close()
db.close()
return result
Here we create some methods to connect to our database and you also need to create config.py:
[connector_python]
user = root
host = 127.0.0.1
port = 3306
password = root
database = projectone
[application_config]
driver = 'SQL Server'
You need to put the name of the database, the user and the host(by default 127.0.0.1)
Let's also create DataRepository.py and create all the methods we're going to use:
from .Database import Database
class DataRepository:
@staticmethod
def read_users():
sql = "SELECT * FROM users"
return Database.get_rows(sql)
@staticmethod
def create_user(firstname, lastname, password, type, honorific="", recorded=0):
sql = "INSERT into users (firstname, lastname, password, type, honorific, recorded) VALUES (%s, %s, %s, %s, %s, %s)"
params = [firstname, lastname, password, type, honorific, recorded]
return Database.execute_sql(sql, params)
@staticmethod
def is_recorded(user_id):
sql = "SELECT recorded FROM users WHERE userId=%s"
params = [user_id]
return Database.get_rows(sql, params)
@staticmethod
def get_password(user_id):
sql = "SELECT password FROM users WHERE userId=%s"
params = [user_id]
return Database.get_rows(sql, params)
@staticmethod
def set_recorded(user_id):
sql = "UPDATE users SET recorded = 1 WHERE userId = %s"
params = [user_id]
return Database.execute_sql(sql, params)
@staticmethod
def get_name(user_id):
sql = "SELECT firstname, honorific FROM users WHERE userId=%s"
params = [user_id]
return Database.get_rows(sql, params)
This is just a simple SQL queries that we need for our program to work, if you know some basic SQL that would be easy for you.
Making a Website






For this part I'm going to make a simple website to show how it works. I think it can be easy implemented to any study platform e.g. Leho.
Here is the code for app.py:
from repositories.DataRepository import DataRepository
from flask import Flask, request, jsonify, redirect,render_template, url_for, flash, session
from flask_cors import CORS
import secrets
import sys
import os
import threading
import socket
from queue import Queue
from BLE_client import run
# Creating two Queues for communication between threads.
tx_q = Queue()
rx_q = Queue()
targetDeviceName=None
targetDeviceMac="D8:3A:DD:D9:6C:7F"
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
app = Flask(__name__)
CORS(app)
app.secret_key = secrets.token_hex(16)
@app.route('/')
def index():
return redirect("/signup", code=302)
# address to get all customers. (GET method)
@app.route('/users', methods=['GET'])
def users():
# tx_q.put(' All Users ')
users = DataRepository.read_users()
users.reverse()
return render_template('users.html', users=users)
@app.route('/signup', methods=['GET'])
def signup():
tx_q.put(' You need to Sign up ')
return render_template('signup.html')
@app.route('/login/<user_id>', methods=['POST'])
def login(user_id):
data = DataRepository.is_recorded(user_id)
is_recorded = data[0]['recorded']
if is_recorded == 0:
return redirect(url_for('record_user', user_id=user_id))
elif is_recorded == 1:
return redirect(url_for('login_user', user_id=user_id))
else:
return redirect(url_for('users'))
@app.route('/login/<user_id>', methods=['GET'])
def login_user(user_id):
data = DataRepository.is_recorded(user_id)
is_recorded = data[0]['recorded']
if is_recorded == 0:
return redirect(url_for('record_user', user_id=user_id))
elif is_recorded == 1:
return render_template('login.html', user_id=user_id)
else:
return redirect(url_for('users'))
@app.route('/record/<user_id>', methods=['GET'])
def record_user(user_id):
tx_q.put(f" Please Enter Your Password ")
return render_template('password.html', user_id=user_id)
is_logged_in = False
@app.route('/auth', methods=['POST'])
def auth_user():
try:
global is_logged_in
tx_q.put(' Look into the camera ')
data = request.get_json() # Access the JSON data sent by the AJAX request
user_id = data['userId'] # Extract the user ID
response = DataRepository.get_name(user_id)
task_thread = threading.Thread(target=run_task, args=("PREDICT", user_id))
task_thread.start()
task_thread.join()
if not is_logged_in:
print("Couldn't login")
tx_q.put(" Couldn't login Try again ")
else:
honorific = f"{response[0]['honorific']} " if response[0]['honorific'] not in ('', '-') else ''
print(1, honorific, 1)
tx_q.put(f"Welcome {honorific}{response[0]['firstname']}")
return jsonify({'message': is_logged_in})
except Exception as e:
return jsonify({'error': str(e)}), 500 # Return error response with status code
@app.route('/record/<user_id>', methods=['POST'])
def check_password(user_id):
user_password = request.form['userPassword']
data = DataRepository.get_password(user_id)
if user_password == data[0]['password']:
tx_q.put(f" Let's record Your face ")
return render_template('record.html', user_id=user_id)
else:
tx_q.put(f" Incorrect password ")
return render_template('password.html', user_id=user_id, error="Incorrect password")
def run_task(task_type, user_id):
try:
global is_logged_in
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 65432))
client.send(f"{task_type} {user_id}".encode('utf-8'))
response = client.recv(1024).decode('utf-8')
if task_type == "PREDICT":
is_logged_in = response
client.close()
return is_logged_in
except Exception as e:
return f"Exception occurred: {e}"
@app.route('/recording', methods=['POST'])
def recording():
try:
tx_q.put(f" Please rotate your face a bit ")
data = request.get_json() # Access the JSON data sent by the AJAX request
user_id = data['userId'] # Extract the user ID
task_thread = threading.Thread(target=run_task, args=("CAPTURE", user_id))
task_thread.start()
task_thread.join()
return jsonify({'message': 'Ok'})
except Exception as e:
return jsonify({'error': str(e)}), 500 # Return error response with status code
@app.route('/split', methods=['POST'])
def split():
try:
tx_q.put(f"Making a dataset")
data = request.get_json()
class_name = data['className']
task_thread = threading.Thread(target=run_task, args=("SPLIT", class_name))
task_thread.start()
task_thread.join()
return jsonify({'message': 'Ok'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/train', methods=['GET'])
def train():
try:
tx_q.put(f"Training a model")
task_thread = threading.Thread(target=run_task, args=("TRAIN", None))
task_thread.start()
task_thread.join()
return jsonify({'message': 'Ok'})
except Exception as e:
return jsonify({'error': str(e)}), 500 # Return error response with status code
@app.route('/best', methods=['POST'])
def best():
try:
tx_q.put(f" Choosing the best model ")
data = request.get_json() # Access the JSON data sent by the AJAX request
user_id = data['userId'] # Extract the user ID
task_thread = threading.Thread(target=run_task, args=("BEST", user_id))
task_thread.start()
task_thread.join()
DataRepository.set_recorded(user_id)
flash('trained', 'trained')
tx_q.put(f" Model has been trained! ")
redirect_url = url_for('users')
return jsonify({'redirect_url': redirect_url})
except Exception as e:
return jsonify({'error': str(e)}), 500 # Return error response with status code
# address to ADD a new customer (POST method for your FORM data)
@app.route('/users', methods=['POST'])
def add_user():
# Use the data_from_form to get the data from the form
# Use the name attributes from the <input> tags in your form as key
honorific = request.form.get('honorific', '')
the_new_id = DataRepository.create_user(request.form['userFirstName'], request.form['userLastName'], request.form['userPassword'], request.form['type'], honorific, 0)
the_new_name = f"{request.form['userFirstName']} {request.form['userLastName']}"
flash(the_new_name, 'name')
flash(the_new_id, 'id')
tx_q.put(f"User {request.form['userFirstName']} added with id {the_new_id}")
return redirect(url_for('users'))
def init_ble_thread():
# Creating a new thread for running a function 'run' with specified arguments.
ble_client_thread = threading.Thread(target=run, args=(
rx_q, tx_q, targetDeviceName, targetDeviceMac), daemon=True)
# Starting the thread execution.
ble_client_thread.start()
if __name__ == '__main__':
init_ble_thread()
tx_q.put(' Welcome to FaceAuth ')
app.run(host='127.0.0.1', port=5000, debug=True)
This is just some basic routing and also here I connect to my Raspberry Pi via Bluetooth, I also import socket, but I'll show you why a bit later. Let's do all the pages, first page is users.html:
<!DOCTYPE html>
<html lang="en">
<head> <title>All Users</title> <meta charset="UTF-8"> <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="static/css/style.min.css"><link rel="shortcut icon" href="static/img/logo/icon.svg"><meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
<body>
<div class="users">
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% if messages[0][0] == 'name' %}
<div class="users__popup">
<div class="users__added">
<div class="users__check">
<span></span>
<picture><source srcset="static/img/icons/check.svg" type="image/webp"><img src="static/img/icons/check.svg" alt=""></picture>
</div>
<div class="users__text">
User <b>{{ messages[0][1] }}</b> has been successfully added with the id <b>{{ messages[1][1] }}</b>!
</div>
</div>
</div>
{% elif messages[0][0] == 'trained' %}
<div class="users__popup">
<div class="users__added">
<div class="users__check">
<span></span>
<picture><source srcset="static/img/icons/check.svg" type="image/webp"><img src="static/img/icons/check.svg" alt=""></picture>
</div>
<div class="users__text">
Model has been trained! You can now login via your face.
</div>
</div>
</div>
{% endif %}
{% endif %}
{% endwith %}
<div class="users__body">
<div class="users__top">
<a href="/signup" class="users__btn">Sign up</a>
<div class="users__title"><h1>All Users</h1></div>
<a href="/signup" class="users__btn">Sign up</a>
</div>
<ul class="users__tabs">
<li class="users__tab _active">Student</li>
<li class="users__tab _active">Teacher</li>
</ul>
<ul class="users__list">
{% for user in users %}
<li class="users__item {{ user.type }}">
<form action="{{ url_for('login', user_id=user.userId) }}" method="POST" class="users__content">
<div class="users__main">
{% if user.honorific %}
<div class="users__honorific">
{{ user.honorific }}
</div>
{% endif %}
<div class="users__name">
{{ user.firstname }} {{ user.lastname }}
</div>
</div>
<div class="users__info">
<div class="users__type">
{{ user.type }}
</div>
<button type="submit" class="users__login">
{{ 'Login' if user.recorded else 'Record' }}
</button>
</div>
</form>
</li>
{% endfor %}
</ul>
<div class="users__empty {% if not users %}_show{% endif %}">
<p>No users found</p>
</div>
</div>
</div>
<script src="static/js/vendors.min.js"></script>
<script src="static/js/app.min.js"></script>
</body>
</html>
Here I want to display all users and also at the top I have some flash popups that I want to display after some actions.
Next page is signup.html, so the user can easily sign up:
<!DOCTYPE html>
<html lang="en">
<head> <title>Sign Up</title> <meta charset="UTF-8"> <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="static/css/style.min.css"> <!-- <meta name="theme-color" content="#fff"> --> <!-- <meta name="theme-color" content="#F2709C"/> --> <link rel="shortcut icon" href="static/img/logo/icon.svg"> <!-- <meta name="robots" content="noindex, nofollow"> --> <!-- <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=0"> --> <meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
<body>
<div class="form">
<div class="form__body">
<div class="form__title"><h1>FaceAuth</h1></div>
<form action="/users" class="form-main" method="post">
<div class="form-main__input">
<input required class="input-anim" type="text" autocomplete="given-name" placeholder="First Name" name="userFirstName" id="userFirstName">
<label for="userFirstName"></label>
</div>
<div class="form-main__input">
<input required class="input-anim" type="text" autocomplete="family-name" placeholder="Last Name" name="userLastName" id="userLastName">
<label for="userLastName"></label>
</div>
<div class="form-main__input form-main__input_password">
<input required class="input-anim" type="password" placeholder="Password" name="userPassword" id="userPassword">
<label for="userPassword"></label>
<div class="form-main__eye">
<svg>
<use xlink:href='static/img/sprite.svg#eye'></use>
</svg>
<svg>
<use xlink:href='static/img/sprite.svg#eye-slash'></use>
</svg>
</div>
</div>
<div class="form-main__select">
<div class="select" data-name="type" data-state tabindex="0">
<div class="select__title" data-default="Who are you?"><span>Who are you?</span></div>
<div class="select__content">
<div class="select__body">
<input type="radio" class="select__input" id="student" value="Student" name="type">
<label class="select__label" for="student"><span>I'm a Student</span></label>
<span class="select__line"></span>
<input type="radio" class="select__input" id="teacher" value="Teacher" name="type">
<label class="select__label" for="teacher"><span>I'm a Teacher</span></label>
</div>
</div>
</div>
</div>
<div class="form-main__select">
<div class="select" data-name="honorific" data-state tabindex="0">
<div class="select__title" data-default="Select your title"><span>Select your title</span></div>
<div class="select__content">
<div class="select__body">
<input type="radio" class="select__input" id="mr" value="Mr" name="honorific">
<label class="select__label" for="mr"><span>Mr.</span></label>
<span class="select__line"></span>
<input type="radio" class="select__input" id="ms" value="Ms" name="honorific">
<label class="select__label" for="ms"><span>Ms.</span></label>
<span class="select__line"></span>
<input type="radio" class="select__input" id="notSpecify" value="-" name="honorific">
<label class="select__label" for="notSpecify"><span>Prefer to not specify</span></label>
</div>
</div>
</div>
</div>
<div class="form-main__bottom">
<button type="submit" class="form-main__btn btn">Sign up</button>
</div>
</form>
</div>
</div>
<script src="static/js/vendors.min.js"></script>
<script src="static/js/app.min.js"></script>
</body>
</html>
Here you can type your first name, second name, password and choose if you're a student or teacher, in case if you're a teacher you can choose your honorific: Mr, Ms or Prefer to not specify(-)
Next one is password.html, it uses to ask user's password before recording a face:
<!DOCTYPE html>
<html lang="en">
<head> <title>Enter your password</title> <meta charset="UTF-8"> <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="/static/css/style.min.css"><link rel="shortcut icon" href="/static/img/logo/icon.svg"><meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
<body>
<div class="form">
<div class="form__body form__body_password">
<div class="form__title form__title_small"><h1>Enter your password</h1></div>
<form action="{{ url_for('check_password', user_id=user_id) }}" method="POST" class="form-main">
<div class="form-main__input form-main__input_password">
<input required class="input-anim" type="password" placeholder="Password" name="userPassword" id="userPassword">
<label for="userPassword"></label>
<div class="form-main__eye">
<svg>
<use xlink:href='/static/img/sprite.svg#eye'></use>
</svg>
<svg>
<use xlink:href='/static/img/sprite.svg#eye-slash'></use>
</svg>
</div>
</div>
{% if error %}
<div class="form-main__error">
{{ error }}
</div>
{% endif %}
<div class="form-main__bottom form-main__bottom_password">
<button type="submit" class="form-main__btn form-main__btn_big btn">Next</button>
</div>
</form>
</div>
</div>
<script src="/static/js/vendors.min.js"></script>
<script src="/static/js/app.min.js"></script>
</body>
</html>
And now of course record.html, where you can start recording your face and track all the progress:
<!DOCTYPE html>
<html lang="en">
<head> <title>Recording</title> <meta charset="UTF-8"> <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="/static/css/style.min.css"><link rel="shortcut icon" href="/static/img/logo/icon.svg"><meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
<body>
<div class="record">
<div class="record__body">
<div class="record__title"><h1>Let's record your face</h1></div>
<div class="record__main">
<div class="record__icon">
<img src="/static/img/icons/face.svg" alt="">
<img class="_hide" src="/static/img/icons/rotate.svg" alt="">
</div>
<div class="record__bottom">
<button data-id='{{ user_id }}' class="record__btn">
Start recording
</button>
</div>
</div>
</div>
</div>
<div class="progress _hide">
<div class="progress__body">
<div class="progress__title"><h1>Almost there!</h1></div>
<ul class="progress__steps">
<li class="progress__step _progress">
<div class="progress__icon">
<svg>
<use xlink:href='/static/img/sprite.svg#folder'></use>
</svg>
</div>
<p class="progress__descr">Making a dataset</p>
</li>
<li class="progress__step">
<div class="progress__icon">
<svg>
<use xlink:href='/static/img/sprite.svg#study'></use>
</svg>
</div>
<p class="progress__descr">Training a model</p>
</li>
<li class="progress__step">
<div class="progress__icon">
<svg>
<use xlink:href='/static/img/sprite.svg#star'></use>
</svg>
</div>
<p class="progress__descr">Choosing the best model</p>
</li>
</ul>
</div>
</div>
<script src="/static/js/vendors.min.js"></script>
<script src="/static/js/app.min.js"></script>
</body>
</html>
And finally, last but not least login.html:
<!DOCTYPE html>
<html lang="en">
<head> <title>Auth</title> <meta charset="UTF-8"> <meta name="format-detection" content="telephone=no"> <link rel="stylesheet" href="/static/css/style.min.css"> <!-- <meta name="theme-color" content="#fff"> --> <!-- <meta name="theme-color" content="#F2709C"/> --> <link rel="shortcut icon" href="/static/img/logo/icon.svg"> <!-- <meta name="robots" content="noindex, nofollow"> --> <!-- <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=0"> --> <meta name="viewport" content="width=device-width, initial-scale=1.0"></head>
<body>
<div class="auth" data-id="{{ user_id }}">
<div class="auth__body">
<div class="auth__title auth__title_lock">
Look into<br> the camera
</div>
<div class="auth__title auth__title_error">
Oops... Try again
</div>
<div class="auth__title auth__title_open">
Welcome!
</div>
<div class="auth__icon">
<picture><source srcset="/static/img/icons/lock.svg" type="image/webp"><img src="/static/img/icons/lock.svg" alt=""></picture>
</div>
<div class="auth__open">
<div class="auth__key">
<picture><source srcset="/static/img/icons/key.svg" type="image/webp"><img src="/static/img/icons/key.svg" alt=""></picture>
</div>
</div>
</div>
</div>
<script src="/static/js/vendors.min.js"></script>
<script src="/static/js/app.min.js"></script>
</body>
</html>
This page should appear when you want to login as a certain user.
You can download the static folder(images, icons, css, js, fonts) form my GitHub repository, so you don't need to code it yourself :)
Socket Server
Now that we've finished the frontend part, let's talk about the backend and why we import socket. As you know, if we run a process within a function, we can't call another function while our process is still running. In other words, our Flask app won't respond when we run a detection process, for instance, so we have to wait before doing anything else. By running a socket server, we can delegate the process to run independently and send the response once it's finished. This allows our website to continue accepting new connections and responding to other requests while the process is running.
Here is my socket_server
app.py:
import socket
import threading
from detection import capture_images_with_yolo
from split import split_dataset_by_class
from train_detection import train_detection
from best import get_the_best_model
from predict import predict_user
import os
CONCURRENT_CONNECTIONS = 5
semaphore = threading.Semaphore(CONCURRENT_CONNECTIONS)
lock = threading.Lock()
task_running = False
server = None
def handle_client(client_socket):
global task_running
with semaphore:
try:
with lock:
if not task_running:
task_running = True
else:
response = "Task is already running. Please try again later."
client_socket.send(response.encode('utf-8'))
return
task_info = client_socket.recv(1024).decode('utf-8').split(' ')
task_type = task_info[0]
user_id = task_info[1]
if task_type == "CAPTURE":
response = capture_images_with_yolo(user_id)
elif task_type == "SPLIT":
class_name = user_id # Assuming user_id is used as class_name for splitting
split_dataset_by_class(class_name)
response = "Dataset split finished."
elif task_type == "TRAIN":
response = train_detection()
elif task_type == "BEST":
get_the_best_model(user_id)
response = "The best model found"
elif task_type == "PREDICT":
response = predict_user(user_id)
else:
response = "Unknown task type."
client_socket.send(response.encode('utf-8'))
except Exception as e:
response = f"Exception in handling client: {e}"
client_socket.send(response.encode('utf-8'))
finally:
client_socket.close()
with lock:
task_running = False
def start_server(host='127.0.0.1', port=65432):
global server
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(5)
print(f"Server listening on {host}:{port}")
while True:
client_socket, addr = server.accept()
print(f"Accepted connection from {addr}")
client_handler = threading.Thread(target=handle_client, args=(client_socket,))
client_handler.start()
if __name__ == "__main__":
try:
start_server()
except KeyboardInterrupt:
print("Stopping the server...")
if server:
server.close()
print("Server stopped.")
Here I have some tasks, let's talk about some of them
detection.py:
import cv2
import os
import numpy as np
from ultralytics import YOLO
def resize_and_pad(image, target_size):
# Convert image to grayscale
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
h, w = gray_image.shape[:2]
scale = target_size / max(h, w)
new_w, new_h = int(w * scale), int(h * scale)
resized_image = cv2.resize(gray_image, (new_w, new_h))
top = bottom = (target_size - new_h) // 2
left = right = (target_size - new_w) // 2
padded_image = cv2.copyMakeBorder(
resized_image, top, bottom, left, right,
borderType=cv2.BORDER_CONSTANT, value=0
)
return padded_image
def apply_augmentation(image):
# Randomly adjust brightness, contrast, saturation
alpha = 1.0 + np.random.uniform(-0.5, 0.5) # Random brightness factor
beta = 0.5 + np.random.uniform(-0.25, 0.25) # Random contrast factor
saturation = 0.5 + np.random.uniform(-0.25, 0.25) # Random saturation factor
image = cv2.convertScaleAbs(image, alpha=alpha, beta=beta)
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
hsv_image[:, :, 1] = np.clip(hsv_image[:, :, 1] * saturation, 0, 255).astype(np.uint8)
image = cv2.cvtColor(hsv_image, cv2.COLOR_HSV2BGR)
# Randomly rotate image
angle = np.random.uniform(-15, 15)
rows, cols = image.shape[:2]
M = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
image = cv2.warpAffine(image, M, (cols, rows))
return image
def capture_images_with_yolo(user_id):
response = ""
try:
model = YOLO(r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\models\detect\train5\weights\best.pt")
output_dir = f'AI/dataset/{user_id}'
os.makedirs(output_dir, exist_ok=True)
# Attempt to open the video capture device
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not cap.isOpened():
raise Exception("Error: Could not open webcam.")
# Adjust camera settings
cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25) # Disable auto exposure (value may vary)
cap.set(cv2.CAP_PROP_BRIGHTNESS, 0.5) # Adjust brightness (0.0 to 1.0)
cap.set(cv2.CAP_PROP_CONTRAST, 0.5) # Adjust contrast (0.0 to 1.0)
cap.set(cv2.CAP_PROP_EXPOSURE, -4) # Adjust exposure (varies by camera)
fps = 30
frames_to_capture = 10
frame_count = 0
original_image_count = 0
augmented_image_count = 0
while original_image_count < 100:
if cap is not None:
ret, frame = cap.read()
if not ret:
response = "Error: Could not read frame."
break
frame_count += 1
if frame_count % (fps // frames_to_capture) == 0:
results_list = model(frame)
for results in results_list:
if results.boxes is not None and len(results.boxes) > 0:
for box in results.boxes.xyxy:
x1, y1, x2, y2 = box.cpu().numpy().astype(int)
cropped_image = frame[y1:y2, x1:x2]
# Apply augmentation and save augmented images
for _ in range(3): # Create 3 augmented versions per original image
augmented_image = apply_augmentation(cropped_image)
resized_image = resize_and_pad(augmented_image, 320)
augmented_image_path = os.path.join(output_dir, f'{augmented_image_count}.jpg')
cv2.imwrite(augmented_image_path, resized_image)
augmented_image_count += 1
# Save original image
resized_image = resize_and_pad(cropped_image, 320)
original_image_path = os.path.join(output_dir, f'{original_image_count}.jpg')
cv2.imwrite(original_image_path, resized_image)
original_image_count += 1
if original_image_count >= 100:
break
response = "Ok"
except Exception as e:
response = f"Exception occurred: {e}"
finally:
if cap is not None and cap.isOpened():
cap.release()
cv2.destroyAllWindows()
return response
Here, we're using the webcam to capture pictures. The resize_and_pad function ensures our image is 320x320 pixels. If the image is not square, it fills the background with black. And also we're applying grayscale, this allows our model to focus on shapes but not color. The apply_augmentation function applies augmentation to the images, creating 300 variations for each user.
split.py:
import os
import shutil
import random
def split_dataset_by_class(class_name, train_ratio=0.7, val_ratio=0.2, test_ratio=0.1):
dataset_path = r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\facerecognition-2"
dataset_images = r'D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\dataset'
# Create train, val, and test directories if they don't exist
for split in ['train', 'val', 'test']:
split_path = os.path.join(dataset_path, split, 'user')
if not os.path.exists(split_path):
os.makedirs(split_path)
else:
# Clear the directory if it already exists
for filename in os.listdir(split_path):
file_path = os.path.join(split_path, filename)
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
# List all images in the class folder
class_images = [img for img in os.listdir(os.path.join(dataset_images, class_name)) if img.endswith(('.jpg', '.jpeg', '.png'))]
random.shuffle(class_images)
# Calculate the number of images for each split
num_images = len(class_images)
num_train = int(train_ratio * num_images)
num_val = int(val_ratio * num_images)
num_test = num_images - num_train - num_val
# Move images to train, val, and test directories
for i, img in enumerate(class_images):
img_path = os.path.join(dataset_images, class_name, img)
if i < num_train:
dest_folder = 'train'
elif i < num_train + num_val:
dest_folder = 'val'
else:
dest_folder = 'test'
dest_path = os.path.join(dataset_path, dest_folder, 'user', img)
shutil.copy(img_path, dest_path)
print(f"Split user {class_name} images into train ({num_train}), val ({num_val}), and test ({num_test}) sets.")
This function is used to split our 300 images into train, test and val sets, so we can start train our classification model.
Making a Face Recognition Model

First, let's create a new Roboflow project and call it 'facerecognition'. I initially set the Annotation Group to 'unknown', but we will create a group 'user' for each individual user. This approach allows us to generate a model for each user in our program. Although we could create a single model with all users, it would take too long to train and result in poor accuracy. Let's just create some dataset, I put around 1,000 images and classified them as 'unknown', now let's export our dataset and paste into our project as we did before. Now we have a new folder 'facerecognition' with our dataset, let's train our model
train_detection.py:
from ultralytics import YOLO
def train_detection():
response = ""
try:
# Load the YOLO model
model = YOLO(model="yolov8s-cls.pt")
# Set the device to GPU
device = 'cuda'
# Train the model on the GPU
model.train(
data="D:\\Downloads\\Howest\\Semester 2\\Project_one\\2023-2024-projectone-ctai-NikitosKokos\\AI\\facerecognition-2",
epochs=10,
imgsz=320,
# verbose=True,
verbose=False,
batch=8,
device=device
)
# Validate the model
model.val()
# Export the model
model.export()
response = "Training completed successfully."
except Exception as e:
response = f"Exception occurred: {e}"
return response
Here we just train our classification model and now we have to choose the best model and put it in models/classify/user-id/best.pt
best.py:
import os
import shutil
def get_the_best_model(user_id):
# Find the folder with the highest number containing weights/best.pt
classify_runs_dir = r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\runs\classify"
highest_num = -1
highest_num_dir = None
for folder_name in os.listdir(classify_runs_dir):
if folder_name.startswith("train") and folder_name[5:].isdigit():
folder_num = int(folder_name[5:])
weights_dir = os.path.join(classify_runs_dir, folder_name, "weights")
best_pt_path = os.path.join(weights_dir, "best.pt")
if os.path.exists(best_pt_path) and folder_num > highest_num:
highest_num = folder_num
highest_num_dir = folder_name
if highest_num_dir:
best_pt_path = os.path.join(classify_runs_dir, highest_num_dir, "weights", "best.pt")
destination_dir = fr"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\models\classify\{user_id}"
os.makedirs(destination_dir, exist_ok=True)
destination_path = os.path.join(destination_dir, "best.pt")
shutil.copyfile(best_pt_path, destination_path)
print(f"Copied {best_pt_path} to {destination_path}")
else:
print("No valid training directories found with weights/best.pt.")
We're looking for the folder with the highest number that contains weights/best.pt and copy it to models/classify/user-id/best.pt
Time to make a prediction, let's create predict.py:
import cv2
import time
from ultralytics import YOLO
import os
def resize_and_pad(image, target_size):
# Convert image to grayscale
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
h, w = gray_image.shape[:2]
scale = target_size / max(h, w)
new_w, new_h = int(w * scale), int(h * scale)
resized_image = cv2.resize(gray_image, (new_w, new_h))
top = bottom = (target_size - new_h) // 2
left = right = (target_size - new_w) // 2
padded_image = cv2.copyMakeBorder(
resized_image, top, bottom, left, right,
borderType=cv2.BORDER_CONSTANT, value=0
)
return padded_image
def predict_user(user_id):
response = ''
try:
# Load the YOLO models
detection_model = YOLO(r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\models\detect\train5\weights\best.pt")
# Load the YOLO classification model for the specific user
classification_model_path = fr"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\models\classify\{user_id}\best.pt"
classification_model = YOLO(classification_model_path)
# Attempt to open the video capture device
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not cap.isOpened():
raise Exception("Error: Could not open webcam.")
# Adjust camera settings
cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25) # Disable auto exposure (value may vary)
cap.set(cv2.CAP_PROP_BRIGHTNESS, 0.5) # Adjust brightness (0.0 to 1.0)
cap.set(cv2.CAP_PROP_CONTRAST, 0.5) # Adjust contrast (0.0 to 1.0)
cap.set(cv2.CAP_PROP_EXPOSURE, -4) # Adjust exposure (varies by camera, may need tuning)
# Initialize variables for capturing frames and timing
start_time = time.time()
duration = 5 # capture for 5 seconds
total_frames = 0
# Directory to save high-confidence images
save_dir = r"D:\Downloads\Howest\Semester 2\Project_one\2023-2024-projectone-ctai-NikitosKokos\AI\dataset\login"
os.makedirs(save_dir, exist_ok=True)
while (time.time() - start_time < duration) and not response:
ret, frame = cap.read()
if not ret:
response = "Error: Could not read frame."
break
total_frames += 1
# Use detection model to detect objects in the frame
results_list = detection_model(frame)
for results in results_list:
if results.boxes is not None and len(results.boxes) > 0:
for box in results.boxes.xyxy:
x1, y1, x2, y2 = box.cpu().numpy().astype(int)
cropped_image = frame[y1:y2, x1:x2]
# Convert cropped image to grayscale
gray_cropped_image = resize_and_pad(cropped_image, 320)
# Use classification model to classify the grayscale cropped image
classification_results = classification_model(gray_cropped_image)
if classification_results and len(classification_results) > 0:
classification_result = classification_results[0]
if classification_result.probs is not None:
class_probs = classification_result.probs.cpu().numpy()
# Access the top1 attribute for the predicted class
predicted_class = classification_result.probs.top1
if classification_result.names[predicted_class] == 'user':
if classification_result.probs.top1conf > 0.9:
response = 'Ok'
print('Time elapsed:', round((time.time() - start_time), 2))
except Exception as e:
response = f"Exception occurred: {e}"
finally:
if cap.isOpened():
cap.release()
cv2.destroyAllWindows()
return response
Here we have a similar function resize_and_pad to convert image to grayscale and make it 320x320 pixels. In predict_user we run our detection model to find the face and then run classification model to classify if this face belongs to a certain user or unknown.
Making a Box for Raspberry Pi




For the assembly part, you'll need a 5mm thick multiplex wood sheet measuring 600x450mm for laser cutting. First, create a file in Illustrator that instructs the laser cutting machine on how to cut the wood plate. I'll show you my version of illustrator file, so you could use it.
Next, assemble the pieces by gluing them together, leaving the top piece unglued to allow access to the Raspberry Pi. Position your LCD display into its designated hole and secure it with double-sided tape to prevent it from falling.
Downloads
Connect to Raspberry Pi and Deploy

Here's how we connect our Flask app to a Raspberry Pi. First, ensure you replace targetDeviceMac with the MAC address of your own Raspberry Pi:
# Creating two Queues for communication between threads.
tx_q = Queue()
rx_q = Queue()
targetDeviceName=None
targetDeviceMac="D8:3A:DD:D9:6C:7F"
def init_ble_thread():
# Creating a new thread for running a function 'run' with specified arguments.
ble_client_thread = threading.Thread(target=run, args=(
rx_q, tx_q, targetDeviceName, targetDeviceMac), daemon=True)
# Starting the thread execution.
ble_client_thread.start()
if __name__ == '__main__':
init_ble_thread()
tx_q.put(' Welcome to FaceAuth ')
app.run(host='127.0.0.1', port=5000, debug=True)
Now, let's configure the Raspberry Pi to establish a connection:
import threading
import queue
from LCD import LCD
# Bluez gatt uart service (SERVER)
from bluetooth_uart_server.bluetooth_uart_server import ble_gatt_uart_loop
def main(lcd: LCD):
# Initialise display
lcd.init()
i = 0
rx_q = queue.Queue()
tx_q = queue.Queue()
device_name = "mykyta-rapsi" # TODO: replace with your own (unique) device name
threading.Thread(target=ble_gatt_uart_loop, args=(rx_q, tx_q, device_name), daemon=True).start()
def get_spaces(string):
spaces = ' ' * len(string)
return spaces
def split_string(input_string):
# Ensure the string is at most 32 characters long
trimmed_string = input_string[:32]
# Get the first 16 symbols
first_part = trimmed_string[:16]
# Get the second 16 symbols
second_part = trimmed_string[16:32]
return first_part, second_part
def lcd_print(string):
if len(string) > 32:
print(f'String {string} is too big: {len(string)}')
if len(string) <= 16:
lcd.send_string(f"{string}{get_spaces(string)}",1)
lcd.send_string(" " * 16,2)
else:
first_part, second_part = split_string(string)
lcd.send_string(first_part,1)
lcd.send_string(f'{second_part}{get_spaces(second_part)}',2)
while True:
try:
incoming = rx_q.get(timeout=1) # Wait for up to 1 second
if incoming:
print("In main loop: ({})".format(incoming))
lcd_print(incoming)
except Exception as e:
pass # nothing in Q
if __name__ == '__main__':
lcd = LCD()
try:
main(lcd)
except KeyboardInterrupt:
pass
finally:
lcd.send_instruction(0x01) # Clear display & cursor home
Here I have a class LCD, I use send_string to print something on lcd display when we have something in incoming
My LCD class:
from time import sleep
from RPi import GPIO
import smbus
GPIO.setmode(GPIO.BCM)
i2c = smbus.SMBus(1)
class LCD:
def __init__(self, i2c_addr = 0x27, lcd_width = 16, lcd_chr = 1, lcd_cmd = 0, lcd_lines = [0x80, 0xC0], lcd_backlight = 0b0000_1000, enable = 0b0000_0100, e_pulse = 0.0002, e_delay = 0.0002) -> None:
self.__i2c_addr = i2c_addr
self.__lcd_width = lcd_width
self.__lcd_chr = lcd_chr
self.__lcd_cmd = lcd_cmd
self.__lcd_lines = lcd_lines
self.__lcd_backlight = lcd_backlight
self.__enable = enable
self.__e_pulse = e_pulse
self.__e_delay = e_delay
@property
def lcd_width(self):
return self.__lcd_width
def init(self):
# spamming
self.send_byte_with_e_toggle(0b0011_0000)
self.send_byte_with_e_toggle(0b0011_0000)
self.send_byte_with_e_toggle(0b0010_0000) # put into 4bit-mode
self.send_instruction(0x28) # 0010_1000 Data length, number of lines, font size
self.send_instruction(0x06) # 000110 Cursor move direction
self.send_instruction(0x0C) # 0000_1100 Display On, Cursor Off, Blink Off
def send_instruction(self, byte):
self.set_data_bits(byte, self.__lcd_cmd)
sleep(0.001)
def send_character(self, byte):
self.set_data_bits(byte, self.__lcd_chr)
def set_data_bits(self, value, mode):
MSNibble = value & 0xf0
LSNibble = (value & 0x0f) << 4
MSNibble_byte = MSNibble | self.__lcd_backlight | mode
LSNibble_byte = LSNibble | self.__lcd_backlight | mode
sleep(self.__e_delay)
i2c.write_byte(self.__i2c_addr, MSNibble_byte | self.__enable) # send MSNibble with E bit high, BT bit high and RS bit according to # “mode” and little delay
sleep(self.__e_pulse)
i2c.write_byte(self.__i2c_addr, MSNibble_byte & ~self.__enable) # send MSNibble with E bit low, BT bit high and RS bit according to # “mode” and little delay
sleep(self.__e_delay)
i2c.write_byte(self.__i2c_addr, LSNibble_byte | self.__enable) # send LSNibble with E bit high, BT bit high and RS bit according to # “mode” and little delay
sleep(self.__e_pulse)
i2c.write_byte(self.__i2c_addr, LSNibble_byte & ~self.__enable) # send LSNibble with E bit low, BT bit high and RS bit according to # “mode” and little delay
sleep(self.__e_delay)
def clear(self):
self.send_instruction(0x01) # 0000_0001 Clear display
def send_byte_with_e_toggle(self, bits):
# Toggle enable
sleep(self.__e_delay)
# write data to i2c with E bit HIGH
# OR operator will turn all bits to 0 except our e bit that we put high
bits_high = bits | self.__enable
i2c.write_byte(self.__i2c_addr, bits_high)
sleep(self.__e_pulse)
# write data to i2c with E bit LOW
# AND operator will turn all bits to 1 except our e bit that we put low
# ~ bitwise negation
bits_low = bits & ~self.__enable
i2c.write_byte(self.__i2c_addr, bits_low)
sleep(self.__e_delay)
def send_string(self, message, line):
# by default we print on line 1
instruction = self.__lcd_lines[0]
if line > 0 and line < 3:
instruction = self.__lcd_lines[line-1]
if len(message) != self.__lcd_width:
print(f'The message ({message}) length is {len(message)} not {self.__lcd_width}!')
self.send_instruction(instruction)
for char in message:
# get decimal of each symbol
self.send_character(ord(char))
The code for the bluetooth_uart_server can be found on my GitHub repository.
And finally let's deploy our code, so it runs automatically on your Raspberry Pi when it boots up:
First, create a new file named myproject.service
Place the Following Code in the File:
[Unit]
Description=ProjectOne Project
After=network.target
[Service]
ExecStart=/home/user/<name_of_your_repo>/<venv>/bin/python -u /home/user/<name_of_your_repo>/RPi/app.py
WorkingDirectory=/home/user/<name_of_your_repo>/RPi
StandardOutput=inherit
StandardError=inherit
Restart=always
User=user
CPUSchedulingPolicy=rr
CPUSchedulingPriority=99
[Install]
WantedBy=multi-user.target
Run this command to copy this file to /etc/systemd/system
sudo cp myproject.service /etc/systemd/system/myproject.service
Enable the Script to Start Automatically After Booting:
sudo systemctl enable myproject.service
That's it! We're finished our project :)
If you have any questions, I'm glad to answer them. I hope this encourages someone to make this project. Good luck!