293 lines
12 KiB
Python
293 lines
12 KiB
Python
from flask import Flask, request, render_template, redirect, url_for, session, jsonify
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from cryptography.fernet import Fernet
|
|
import os
|
|
from datetime import datetime
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from flask_socketio import SocketIO, emit, join_room, leave_room
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = os.urandom(24)
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
db = SQLAlchemy(app)
|
|
socketio = SocketIO(app)
|
|
|
|
# Function to generate a new encryption key
|
|
def generate_key():
|
|
key = Fernet.generate_key()
|
|
with open("secret.key", "wb") as key_file:
|
|
key_file.write(key)
|
|
|
|
# Function to load the encryption key
|
|
def load_key():
|
|
return open("secret.key", "rb").read()
|
|
|
|
# Ensure the key file exists
|
|
if not os.path.exists("secret.key"):
|
|
generate_key()
|
|
|
|
# Load the key
|
|
key = load_key()
|
|
cipher = Fernet(key)
|
|
|
|
class User(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(150), unique=True, nullable=False)
|
|
password = db.Column(db.String(150), nullable=False)
|
|
online = db.Column(db.Boolean, nullable=False, default=False)
|
|
|
|
class Message(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
sender = db.Column(db.String(150), nullable=False)
|
|
receiver = db.Column(db.String(150), nullable=False)
|
|
content = db.Column(db.Text, nullable=False)
|
|
timestamp = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
|
|
|
|
class PendingMessage(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
sender = db.Column(db.String(150), nullable=False)
|
|
receiver = db.Column(db.String(150), nullable=False)
|
|
content = db.Column(db.Text, nullable=False)
|
|
timestamp = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
|
|
|
|
class Friend(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
friend_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
user = db.relationship('User', foreign_keys=[user_id])
|
|
friend = db.relationship('User', foreign_keys=[friend_id])
|
|
|
|
class FriendRequest(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
sender_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
receiver_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
status = db.Column(db.String(20), nullable=False, default='pending')
|
|
sender = db.relationship('User', foreign_keys=[sender_id])
|
|
receiver = db.relationship('User', foreign_keys=[receiver_id])
|
|
|
|
@app.route('/')
|
|
def index():
|
|
if 'username' in session:
|
|
return redirect(url_for('dashboard'))
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
hashed_password = generate_password_hash(password) # Default method
|
|
new_user = User(username=username, password=hashed_password)
|
|
try:
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
return redirect(url_for('login'))
|
|
except:
|
|
return 'Username already exists!'
|
|
return render_template('register.html')
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
user = User.query.filter_by(username=username).first()
|
|
if user and check_password_hash(user.password, password):
|
|
session['username'] = user.username
|
|
return redirect(url_for('dashboard'))
|
|
return 'Invalid credentials'
|
|
return render_template('login.html')
|
|
|
|
@app.route('/dashboard')
|
|
def dashboard():
|
|
if 'username' in session:
|
|
user = User.query.filter_by(username=session['username']).first()
|
|
friends = Friend.query.filter_by(user_id=user.id).all()
|
|
friend_ids = [friend.friend_id for friend in friends]
|
|
friend_users = User.query.filter(User.id.in_(friend_ids)).all()
|
|
|
|
friend_requests = FriendRequest.query.filter_by(receiver_id=user.id, status='pending').all()
|
|
pending_messages = PendingMessage.query.filter_by(receiver=session['username']).all()
|
|
messages = Message.query.filter_by(receiver=session['username']).all()
|
|
|
|
decrypted_pending_messages = [(msg.sender, cipher.decrypt(msg.content.encode()).decode(), msg.timestamp) for msg in pending_messages]
|
|
decrypted_messages = [(msg.sender, cipher.decrypt(msg.content.encode()).decode(), msg.timestamp) for msg in messages]
|
|
|
|
return render_template('dashboard.html', username=session['username'], friends=friend_users, friend_requests=friend_requests, pending_messages=decrypted_pending_messages, messages=decrypted_messages)
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/send_message/<receiver>', methods=['POST'])
|
|
def send_message(receiver):
|
|
if 'username' in session:
|
|
data = request.get_json()
|
|
content = data['content']
|
|
encrypted_content = cipher.encrypt(content.encode()).decode()
|
|
|
|
# Check if they are friends
|
|
user = User.query.filter_by(username=session['username']).first()
|
|
receiver_user = User.query.filter_by(username=receiver).first()
|
|
if not receiver_user:
|
|
return jsonify({'error': 'User not found'}), 404
|
|
friend = Friend.query.filter_by(user_id=user.id, friend_id=receiver_user.id).first()
|
|
|
|
if friend:
|
|
new_message = Message(sender=session['username'], receiver=receiver, content=encrypted_content)
|
|
db.session.add(new_message)
|
|
db.session.commit()
|
|
decrypted_content = cipher.decrypt(encrypted_content.encode()).decode()
|
|
socketio.emit('new_message', {
|
|
'sender': session['username'],
|
|
'content': decrypted_content,
|
|
'timestamp': new_message.timestamp.strftime("%Y-%m-%d %H:%M:%S")
|
|
}, room=receiver)
|
|
else:
|
|
pending_message = PendingMessage(sender=session['username'], receiver=receiver, content=encrypted_content)
|
|
db.session.add(pending_message)
|
|
db.session.commit()
|
|
|
|
return jsonify({'status': 'Message sent'}), 200
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
@app.route('/get_messages/<friend_username>', methods=['GET'])
|
|
def get_messages(friend_username):
|
|
if 'username' in session:
|
|
current_user = session['username']
|
|
messages = Message.query.filter(
|
|
((Message.sender == current_user) & (Message.receiver == friend_username)) |
|
|
((Message.sender == friend_username) & (Message.receiver == current_user))
|
|
).all()
|
|
decrypted_messages = [
|
|
{'sender': msg.sender, 'content': cipher.decrypt(msg.content.encode()).decode(), 'timestamp': msg.timestamp.strftime("%Y-%m-%d %H:%M:%S")}
|
|
for msg in messages
|
|
]
|
|
return jsonify({'messages': decrypted_messages})
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
@app.route('/add_friend', methods=['POST'])
|
|
def add_friend():
|
|
if 'username' in session:
|
|
friend_username = request.form['friend_username']
|
|
user = User.query.filter_by(username=session['username']).first()
|
|
friend = User.query.filter_by(username=friend_username).first()
|
|
if friend and user != friend:
|
|
friend_request = FriendRequest(sender_id=user.id, receiver_id=friend.id)
|
|
db.session.add(friend_request)
|
|
db.session.commit()
|
|
socketio.emit('friend_request', {
|
|
'sender': session['username'],
|
|
'receiver': friend_username
|
|
}, room=friend_username)
|
|
return redirect(url_for('dashboard'))
|
|
return 'Friend not found or cannot add yourself as a friend'
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/accept_friend/<int:request_id>', methods=['POST'])
|
|
def accept_friend(request_id):
|
|
if 'username' in session:
|
|
friend_request = FriendRequest.query.get(request_id)
|
|
if friend_request and friend_request.receiver.username == session['username']:
|
|
friend_request.status = 'accepted'
|
|
db.session.commit()
|
|
|
|
# Create friendships both ways
|
|
user_id = friend_request.receiver_id
|
|
friend_id = friend_request.sender_id
|
|
new_friend_1 = Friend(user_id=user_id, friend_id=friend_id)
|
|
new_friend_2 = Friend(user_id=friend_id, friend_id=user_id)
|
|
db.session.add(new_friend_1)
|
|
db.session.add(new_friend_2)
|
|
db.session.commit()
|
|
|
|
# Move pending messages to messages
|
|
pending_messages = PendingMessage.query.filter_by(sender=friend_request.sender.username, receiver=friend_request.receiver.username).all()
|
|
for pm in pending_messages:
|
|
new_message = Message(sender=pm.sender, receiver=pm.receiver, content=pm.content, timestamp=pm.timestamp)
|
|
db.session.add(new_message)
|
|
db.session.delete(pm)
|
|
db.session.commit()
|
|
|
|
socketio.emit('friend_request_accepted', {
|
|
'sender': friend_request.sender.username,
|
|
'receiver': friend_request.receiver.username
|
|
}, room=friend_request.sender.username)
|
|
|
|
return redirect(url_for('dashboard'))
|
|
return 'Friend request not found'
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/reject_friend/<int:request_id>', methods=['POST'])
|
|
def reject_friend(request_id):
|
|
if 'username' in session:
|
|
friend_request = FriendRequest.query.get(request_id)
|
|
if friend_request and friend_request.receiver.username == session['username']:
|
|
db.session.delete(friend_request)
|
|
db.session.commit()
|
|
|
|
socketio.emit('friend_request_rejected', {
|
|
'sender': friend_request.sender.username,
|
|
'receiver': friend_request.receiver.username
|
|
}, room=friend_request.sender.username)
|
|
|
|
return redirect(url_for('dashboard'))
|
|
return 'Friend request not found'
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/remove_friend/<int:friend_id>', methods=['POST'])
|
|
def remove_friend(friend_id):
|
|
if 'username' in session:
|
|
user = User.query.filter_by(username=session['username']).first()
|
|
friend = Friend.query.filter_by(user_id=user.id, friend_id=friend_id).first()
|
|
if friend:
|
|
db.session.delete(friend)
|
|
reciprocal_friend = Friend.query.filter_by(user_id=friend.friend_id, friend_id=user.id).first()
|
|
if reciprocal_friend:
|
|
db.session.delete(reciprocal_friend)
|
|
db.session.commit()
|
|
|
|
socketio.emit('friend_removed', {
|
|
'sender': session['username'],
|
|
'receiver': friend.friend.username
|
|
}, room=friend.friend.username)
|
|
|
|
return redirect(url_for('dashboard'))
|
|
return 'Friend not found'
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.pop('username', None)
|
|
return redirect(url_for('login'))
|
|
|
|
@socketio.on('join')
|
|
def handle_join(data):
|
|
username = data['username']
|
|
join_room(username)
|
|
|
|
@socketio.on('leave')
|
|
def handle_leave(data):
|
|
username = data['username']
|
|
leave_room(username)
|
|
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
if 'username' in session:
|
|
user = User.query.filter_by(username=session['username']).first()
|
|
if user:
|
|
user.online = True
|
|
db.session.commit()
|
|
emit('user_online', {'username': user.username}, broadcast=True)
|
|
|
|
@socketio.on('disconnect')
|
|
def handle_disconnect():
|
|
if 'username' in session:
|
|
user = User.query.filter_by(username=session['username']).first()
|
|
if user:
|
|
user.online = False
|
|
db.session.commit()
|
|
emit('user_offline', {'username': user.username}, broadcast=True)
|
|
|
|
if __name__ == '__main__':
|
|
with app.app_context():
|
|
db.create_all()
|
|
socketio.run(app, debug=True)
|