mirror of
https://github.com/Mailu/Mailu.git
synced 2024-12-22 22:13:11 +02:00
266 lines
16 KiB
Python
266 lines
16 KiB
Python
from flask_restx import Resource, fields, marshal
|
|
import validators, datetime
|
|
|
|
from . import api, response_fields
|
|
from .. import common
|
|
from ... import models
|
|
|
|
db = models.db
|
|
|
|
user = api.namespace('user', description='User operations')
|
|
|
|
user_fields_get = api.model('UserGet', {
|
|
'email': fields.String(description='The email address of the user', example='John.Doe@example.com', attribute='_email'),
|
|
'password': fields.String(description="Hash of the user's password; Example='$bcrypt-sha256$v=2,t=2b,r=12$fmsAdJbYAD1gGQIE5nfJq.$zLkQUEs2XZfTpAEpcix/1k5UTNPm0jO'"),
|
|
'comment': fields.String(description='A description for the user. This description is shown on the Users page', example='my comment'),
|
|
'quota_bytes': fields.Integer(description='The maximum quota for the user’s email box in bytes', example='1000000000'),
|
|
'global_admin': fields.Boolean(description='Make the user a global administrator'),
|
|
'enabled': fields.Boolean(description='Enable the user. When an user is disabled, the user is unable to login to the Admin GUI or webmail or access his email via IMAP/POP3 or send mail'),
|
|
'enable_imap': fields.Boolean(description='Allow email retrieval via IMAP'),
|
|
'enable_pop': fields.Boolean(description='Allow email retrieval via POP3'),
|
|
'allow_spoofing': fields.Boolean(description='Allow the user to spoof the sender (send email as anyone)'),
|
|
'forward_enabled': fields.Boolean(description='Enable auto forwarding'),
|
|
'forward_destination': fields.List(fields.String(description='Email address to forward emails to'), example='Other@example.com'),
|
|
'forward_keep': fields.Boolean(description='Keep a copy of the forwarded email in the inbox'),
|
|
'reply_enabled': fields.Boolean(description='Enable automatic replies. This is also known as out of office (ooo) or out of facility (oof) replies'),
|
|
'reply_subject': fields.String(description='Optional subject for the automatic reply', example='Out of office'),
|
|
'reply_body': fields.String(description='The body of the automatic reply email', example='Hello, I am out of office. I will respond when I am back.'),
|
|
'reply_startdate': fields.Date(description='Start date for automatic replies in YYYY-MM-DD format.', example='2022-02-10'),
|
|
'reply_enddate': fields.Date(description='End date for automatic replies in YYYY-MM-DD format.', example='2022-02-22'),
|
|
'displayed_name': fields.String(description='The display name of the user within the Admin GUI', example='John Doe'),
|
|
'spam_enabled': fields.Boolean(description='Enable the spam filter'),
|
|
'spam_mark_as_read': fields.Boolean(description='Enable marking spam mails as read'),
|
|
'spam_threshold': fields.Integer(description='The user defined spam filter tolerance', example='80'),
|
|
})
|
|
|
|
user_fields_post = api.model('UserCreate', {
|
|
'email': fields.String(description='The email address of the user', example='John.Doe@example.com', attribute='_email', required=True),
|
|
'raw_password': fields.String(description='The raw (plain text) password of the user. Mailu will hash the password using BCRYPT-SHA256', example='secret', required=True),
|
|
'comment': fields.String(description='A description for the user. This description is shown on the Users page', example='my comment'),
|
|
'quota_bytes': fields.Integer(description='The maximum quota for the user’s email box in bytes', example='1000000000'),
|
|
'global_admin': fields.Boolean(description='Make the user a global administrator'),
|
|
'enabled': fields.Boolean(description='Enable the user. When an user is disabled, the user is unable to login to the Admin GUI or webmail or access his email via IMAP/POP3 or send mail'),
|
|
'enable_imap': fields.Boolean(description='Allow email retrieval via IMAP'),
|
|
'enable_pop': fields.Boolean(description='Allow email retrieval via POP3'),
|
|
'allow_spoofing': fields.Boolean(description='Allow the user to spoof the sender (send email as anyone)'),
|
|
'forward_enabled': fields.Boolean(description='Enable auto forwarding'),
|
|
'forward_destination': fields.List(fields.String(description='Email address to forward emails to'), example='Other@example.com'),
|
|
'forward_keep': fields.Boolean(description='Keep a copy of the forwarded email in the inbox'),
|
|
'reply_enabled': fields.Boolean(description='Enable automatic replies. This is also known as out of office (ooo) or out of facility (oof) replies'),
|
|
'reply_subject': fields.String(description='Optional subject for the automatic reply', example='Out of office'),
|
|
'reply_body': fields.String(description='The body of the automatic reply email', example='Hello, I am out of office. I will respond when I am back.'),
|
|
'reply_startdate': fields.Date(description='Start date for automatic replies in YYYY-MM-DD format.', example='2022-02-10'),
|
|
'reply_enddate': fields.Date(description='End date for automatic replies in YYYY-MM-DD format.', example='2022-02-22'),
|
|
'displayed_name': fields.String(description='The display name of the user within the Admin GUI', example='John Doe'),
|
|
'spam_enabled': fields.Boolean(description='Enable the spam filter'),
|
|
'spam_mark_as_read': fields.Boolean(description='Enable marking spam mails as read'),
|
|
'spam_threshold': fields.Integer(description='The user defined spam filter tolerance', example='80'),
|
|
})
|
|
|
|
user_fields_put = api.model('UserUpdate', {
|
|
'raw_password': fields.String(description='The raw (plain text) password of the user. Mailu will hash the password using BCRYPT-SHA256', example='secret'),
|
|
'comment': fields.String(description='A description for the user. This description is shown on the Users page', example='my comment'),
|
|
'quota_bytes': fields.Integer(description='The maximum quota for the user’s email box in bytes', example='1000000000'),
|
|
'global_admin': fields.Boolean(description='Make the user a global administrator'),
|
|
'enabled': fields.Boolean(description='Enable the user. When an user is disabled, the user is unable to login to the Admin GUI or webmail or access his email via IMAP/POP3 or send mail'),
|
|
'change_pw_next_login': fields.Boolean(description='Force the user to change their password at next login'),
|
|
'enable_imap': fields.Boolean(description='Allow email retrieval via IMAP'),
|
|
'enable_pop': fields.Boolean(description='Allow email retrieval via POP3'),
|
|
'allow_spoofing': fields.Boolean(description='Allow the user to spoof the sender (send email as anyone)'),
|
|
'forward_enabled': fields.Boolean(description='Enable auto forwarding'),
|
|
'forward_destination': fields.List(fields.String(description='Email address to forward emails to'), example='Other@example.com'),
|
|
'forward_keep': fields.Boolean(description='Keep a copy of the forwarded email in the inbox'),
|
|
'reply_enabled': fields.Boolean(description='Enable automatic replies. This is also known as out of office (ooo) or out of facility (oof) replies'),
|
|
'reply_subject': fields.String(description='Optional subject for the automatic reply', example='Out of office'),
|
|
'reply_body': fields.String(description='The body of the automatic reply email', example='Hello, I am out of office. I will respond when I am back.'),
|
|
'reply_startdate': fields.Date(description='Start date for automatic replies in YYYY-MM-DD format.', example='2022-02-10'),
|
|
'reply_enddate': fields.Date(description='End date for automatic replies in YYYY-MM-DD format.', example='2022-02-22'),
|
|
'displayed_name': fields.String(description='The display name of the user within the Admin GUI', example='John Doe'),
|
|
'spam_enabled': fields.Boolean(description='Enable the spam filter'),
|
|
'spam_mark_as_read': fields.Boolean(description='Enable marking spam mails as read'),
|
|
'spam_threshold': fields.Integer(description='The user defined spam filter tolerance', example='80'),
|
|
})
|
|
|
|
|
|
@user.route('')
|
|
class Users(Resource):
|
|
@user.doc('list_users')
|
|
@user.marshal_with(user_fields_get, as_list=True, skip_none=True, mask=None)
|
|
@user.doc(security='Bearer')
|
|
@common.api_token_authorization
|
|
def get(self):
|
|
"List users"
|
|
return models.User.query.all()
|
|
|
|
@user.doc('create_user')
|
|
@user.expect(user_fields_post)
|
|
@user.response(200, 'Success', response_fields)
|
|
@user.response(400, 'Input validation exception')
|
|
@user.response(409, 'Duplicate user', response_fields)
|
|
@user.doc(security='Bearer')
|
|
@common.api_token_authorization
|
|
def post(self):
|
|
""" Create user """
|
|
data = api.payload
|
|
if not validators.email(data['email']):
|
|
return { 'code': 400, 'message': f'Provided email address {data["email"]} is not a valid email address'}, 400
|
|
localpart, domain_name = data['email'].lower().rsplit('@', 1)
|
|
domain_found = models.Domain.query.get(domain_name)
|
|
if not domain_found:
|
|
return { 'code': 404, 'message': f'Domain {domain_name} does not exist'}, 404
|
|
|
|
user_new = models.User(email=data['email'])
|
|
if 'raw_password' in data:
|
|
user_new.set_password(data['raw_password'])
|
|
if 'comment' in data:
|
|
user_new.comment = data['comment']
|
|
if 'quota_bytes' in data:
|
|
user_new.quota_bytes = data['quota_bytes']
|
|
if 'global_admin' in data:
|
|
user_new.global_admin = data['global_admin']
|
|
if 'enabled' in data:
|
|
user_new.enabled = data['enabled']
|
|
if 'change_pw_next_login' in data:
|
|
user_new.change_pw_next_login = data['change_pw_next_login']
|
|
if 'enable_imap' in data:
|
|
user_new.enable_imap = data['enable_imap']
|
|
if 'enable_pop' in data:
|
|
user_new.enable_pop = data['enable_pop']
|
|
if 'allow_spoofing' in data:
|
|
user_new.allow_spoofing = data['allow_spoofing']
|
|
if 'forward_enabled' in data:
|
|
user_new.forward_enabled = data['forward_enabled']
|
|
if 'forward_destination' in data:
|
|
user_new.forward_destination = data['forward_destination']
|
|
if 'forward_keep' in data:
|
|
user_new.forward_keep = data['forward_keep']
|
|
if 'reply_enabled' in data:
|
|
user_new.reply_enabled = data['reply_enabled']
|
|
if 'reply_subject' in data:
|
|
user_new.reply_subject = data['reply_subject']
|
|
if 'reply_body' in data:
|
|
user_new.reply_body = data['reply_body']
|
|
if 'reply_startdate' in data:
|
|
year, month, day = data['reply_startdate'].split('-')
|
|
date = datetime.datetime(int(year), int(month), int(day))
|
|
user_new.reply_startdate = date
|
|
if 'reply_enddate' in data:
|
|
year, month, day = data['reply_enddate'].split('-')
|
|
date = datetime.datetime(int(year), int(month), int(day))
|
|
user_new.reply_enddate = date
|
|
if 'displayed_name' in data:
|
|
user_new.displayed_name = data['displayed_name']
|
|
if 'spam_enabled' in data:
|
|
user_new.spam_enabled = data['spam_enabled']
|
|
if 'spam_mark_as_read' in data:
|
|
user_new.spam_mark_as_read = data['spam_mark_as_read']
|
|
if 'spam_threshold' in data:
|
|
user_new.spam_threshold = data['spam_threshold']
|
|
db.session.add(user_new)
|
|
db.session.commit()
|
|
|
|
return {'code': 200,'message': f'User {data["email"]} has been created'}, 200
|
|
|
|
|
|
@user.route('/<string:email>')
|
|
class User(Resource):
|
|
@user.doc('find_user')
|
|
@user.response(400, 'Input validation exception', response_fields)
|
|
@user.response(404, 'User not found', response_fields)
|
|
@user.doc(security='Bearer')
|
|
@common.api_token_authorization
|
|
def get(self, email):
|
|
""" Find user """
|
|
if not validators.email(email):
|
|
return { 'code': 400, 'message': f'Provided email address {email} is not a valid email address'}, 400
|
|
|
|
email_found = models.User.query.filter_by(email=email).first()
|
|
if email_found is None:
|
|
return { 'code': 404, 'message': f'User {email} cannot be found'}, 404
|
|
return marshal(email_found, user_fields_get), 200
|
|
|
|
@user.doc('update_user')
|
|
@user.expect(user_fields_put)
|
|
@user.response(200, 'Success', response_fields)
|
|
@user.response(400, 'Input validation exception', response_fields)
|
|
@user.response(404, 'User not found', response_fields)
|
|
@user.response(409, 'Duplicate user', response_fields)
|
|
@user.doc(security='Bearer')
|
|
@common.api_token_authorization
|
|
def patch(self, email):
|
|
""" Update user """
|
|
data = api.payload
|
|
if not validators.email(email):
|
|
return { 'code': 400, 'message': f'Provided email address {data["email"]} is not a valid email address'}, 400
|
|
user_found = models.User.query.get(email)
|
|
if not user_found:
|
|
return {'code': 404, 'message': f'User {email} cannot be found'}, 404
|
|
|
|
if 'raw_password' in data:
|
|
user_found.set_password(data['raw_password'], keep_only_session='')
|
|
if 'comment' in data:
|
|
user_found.comment = data['comment']
|
|
if 'quota_bytes' in data:
|
|
user_found.quota_bytes = data['quota_bytes']
|
|
if 'global_admin' in data:
|
|
user_found.global_admin = data['global_admin']
|
|
if 'enabled' in data:
|
|
user_found.enabled = data['enabled']
|
|
if 'enable_imap' in data:
|
|
user_found.enable_imap = data['enable_imap']
|
|
if 'enable_pop' in data:
|
|
user_found.enable_pop = data['enable_pop']
|
|
if 'allow_spoofing' in data:
|
|
user_found.allow_spoofing = data['allow_spoofing']
|
|
if 'forward_enabled' in data:
|
|
user_found.forward_enabled = data['forward_enabled']
|
|
if 'forward_destination' in data:
|
|
user_found.forward_destination = data['forward_destination']
|
|
if 'forward_keep' in data:
|
|
user_found.forward_keep = data['forward_keep']
|
|
if 'reply_enabled' in data:
|
|
user_found.reply_enabled = data['reply_enabled']
|
|
if 'reply_subject' in data:
|
|
user_found.reply_subject = data['reply_subject']
|
|
if 'reply_body' in data:
|
|
user_found.reply_body = data['reply_body']
|
|
if 'reply_startdate' in data:
|
|
year, month, day = data['reply_startdate'].split('-')
|
|
date = datetime.datetime(int(year), int(month), int(day))
|
|
user_found.reply_startdate = date
|
|
if 'reply_enddate' in data:
|
|
year, month, day = data['reply_enddate'].split('-')
|
|
date = datetime.datetime(int(year), int(month), int(day))
|
|
user_found.reply_enddate = date
|
|
if 'displayed_name' in data:
|
|
user_found.displayed_name = data['displayed_name']
|
|
if 'spam_enabled' in data:
|
|
user_found.spam_enabled = data['spam_enabled']
|
|
if 'spam_mark_as_read' in data:
|
|
user_found.spam_mark_as_read = data['spam_mark_as_read']
|
|
if 'spam_threshold' in data:
|
|
user_found.spam_threshold = data['spam_threshold']
|
|
db.session.add(user_found)
|
|
db.session.commit()
|
|
|
|
return {'code': 200,'message': f'User {email} has been updated'}, 200
|
|
|
|
|
|
@user.doc('delete_user')
|
|
@user.response(200, 'Success', response_fields)
|
|
@user.response(400, 'Input validation exception', response_fields)
|
|
@user.response(404, 'User not found', response_fields)
|
|
@user.doc(security='Bearer')
|
|
@common.api_token_authorization
|
|
def delete(self, email):
|
|
""" Delete user """
|
|
if not validators.email(email):
|
|
return { 'code': 400, 'message': f'Provided email address {email} is not a valid email address'}, 400
|
|
|
|
email_found = models.User.query.filter_by(email=email).first()
|
|
if email_found is None:
|
|
return { 'code': 404, 'message': f'User {email} cannot be found'}, 404
|
|
db.session.delete(email_found)
|
|
db.session.commit()
|
|
return { 'code': 200, 'message': f'User {email} has been deleted'}, 200
|