1
0
mirror of https://github.com/Mailu/Mailu.git synced 2025-01-30 04:30:49 +02:00

RESTful API bugfix for domains. Add authentication tokens to API

This commit is contained in:
Dimitri Huisman 2023-10-09 16:19:20 +00:00
parent 098cf110c8
commit 3d9a8bc21f
No known key found for this signature in database
3 changed files with 182 additions and 9 deletions

View File

@ -37,7 +37,8 @@ error_fields = api.model('Error', {
'message': fields.String,
})
from . import domains
from . import domain
from . import alias
from . import relay
from . import user
from . import token

View File

@ -115,13 +115,13 @@ class Domains(Resource):
if 'comment' in data:
domain_new.comment = data['comment']
if 'max_users' in data:
domain_new.comment = data['max_users']
domain_new.max_users = data['max_users']
if 'max_aliases' in data:
domain_new.comment = data['max_aliases']
domain_new.max_aliases = data['max_aliases']
if 'max_quota_bytes' in data:
domain_new.comment = data['max_quota_bytes']
domain_new.max_quota_bytes = data['max_quota_bytes']
if 'signup_enabled' in data:
domain_new.comment = data['signup_enabled']
domain_new.signup_enabled = data['signup_enabled']
models.db.session.add(domain_new)
#apply the changes
db.session.commit()
@ -177,13 +177,13 @@ class Domain(Resource):
if 'comment' in data:
domain_found.comment = data['comment']
if 'max_users' in data:
domain_found.comment = data['max_users']
domain_found.max_users = data['max_users']
if 'max_aliases' in data:
domain_found.comment = data['max_aliases']
domain_found.max_aliases = data['max_aliases']
if 'max_quota_bytes' in data:
domain_found.comment = data['max_quota_bytes']
domain_found.max_quota_bytes = data['max_quota_bytes']
if 'signup_enabled' in data:
domain_found.comment = data['signup_enabled']
domain_found.signup_enabled = data['signup_enabled']
models.db.session.add(domain_found)
#apply the changes

View File

@ -0,0 +1,172 @@
from flask_restx import Resource, fields, marshal
import validators, datetime
import flask
from passlib import pwd
from . import api, response_fields
from .. import common
from ... import models
db = models.db
token = api.namespace('token', description='Token operations')
token_user_fields = api.model('TokenGetResponse', {
'id': fields.String(description='The record id of the token (unique identifier)', example='1'),
'email': fields.String(description='The email address of the user', example='John.Doe@example.com', attribute='user_email'),
'comment': fields.String(description='A description for the token. This description is shown on the Authentication tokens page', example='my comment'),
'AuthorizedIP': fields.String(description='Comma separated list of white listed IP addresses or networks that may use this token', example='88.77.66.55', attribute='ip'),
'Created': fields.String(description='The date when the token was created', example='John.Doe@example.com', attribute='created_at'),
'Last edit': fields.String(description='The date when the token was last modifified', example='John.Doe@example.com', attribute='updated_at')
})
token_user_fields_post = api.model('TokenPost', {
'email': fields.String(description='The email address of the user', example='John.Doe@example.com', attribute='user_email'),
'comment': fields.String(description='A description for the token. This description is shown on the Authentication tokens page', example='my comment'),
'AuthorizedIP': fields.String(description='Comma separated list of white listed IP addresses or networks that may use this token', example='88.77.66.55', attribute='ip')
})
token_user_fields_post2 = api.model('TokenPost2', {
'comment': fields.String(description='A description for the token. This description is shown on the Authentication tokens page', example='my comment'),
'AuthorizedIP': fields.String(description='Comma separated list of white listed IP addresses or networks that may use this token', example='88.77.66.55', attribute='ip')
})
token_user_post_response = api.model('TokenPostResponse', {
'id': fields.String(description='The record id of the token (unique identifier)', example='1'),
'token': fields.String(description='The created authentication token for the user.', example='2caf6607de5129e4748a2c061aee56f2', attribute='password'),
'email': fields.String(description='The email address of the user', example='John.Doe@example.com', attribute='user_email'),
'comment': fields.String(description='A description for the token. This description is shown on the Authentication tokens page', example='my comment'),
'AuthorizedIP': fields.String(description='Comma separated list of white listed IP addresses or networks that may use this token', example='88.77.66.55', attribute='ip'),
'Created': fields.String(description='The date when the token was created', example='John.Doe@example.com', attribute='created_at')
})
@token.route('')
class Tokens(Resource):
@token.doc('list_tokens')
@token.marshal_with(token_user_fields, as_list=True, skip_none=True, mask=None)
@token.doc(security='Bearer')
@common.api_token_authorization
def get(self):
"""List tokens"""
return models.Token.query.all()
@token.doc('create_token')
@token.expect(token_user_fields_post)
@token.response(200, 'Success', token_user_post_response)
@token.response(400, 'Input validation exception')
@token.response(409, 'Duplicate relay', response_fields)
@token.doc(security='Bearer')
@common.api_token_authorization
def post(self):
""" Create a new token"""
data = api.payload
email = data['email']
if not validators.email(email):
return { 'code': 400, 'message': f'Provided email address {email} is not a valid email address'}, 400
user_found = models.User.query.get(email)
if not user_found:
return {'code': 404, 'message': f'User {email} cannot be found'}, 404
tokens = user_found.tokens
token_new = models.Token(user_email=data['email'])
if 'comment' in data:
token_new.comment = data['comment']
if 'AuthorizedIP' in data:
token_new.ip = data['AuthorizedIP'].replace(' ','').split(',')
raw_password = pwd.genword(entropy=128, length=32, charset="hex")
token_new.set_password(raw_password)
models.db.session.add(token_new)
#apply the changes
db.session.commit()
flask.current_app.logger.info(f'token_new.id == {token_new.id}.')
response_dict = {
'id' : token_new.id,
'token' : raw_password,
'email' : token_new.user_email,
'comment' : token_new.comment,
'AuthorizedIP' : token_new.ip,
'Created': str(token_new.created_at),
}
return response_dict
@token.route('user/<string:email>')
class Token(Resource):
@token.doc('find_tokens_of_user')
@token.marshal_with(token_user_fields, as_list=True, skip_none=True, mask=None)
@token.doc(security='Bearer')
@common.api_token_authorization
def get(self, email):
"Find tokens of user"
if not validators.email(email):
return { 'code': 400, 'message': f'Provided email address {email} is not a valid email address'}, 400
user_found = models.User.query.get(email)
if not user_found:
return {'code': 404, 'message': f'User {email} cannot be found'}, 404
tokens = user_found.tokens
return tokens
@token.doc('create_token')
@token.expect(token_user_fields_post2)
@token.response(200, 'Success', token_user_post_response)
@token.response(400, 'Input validation exception')
@token.response(409, 'Duplicate relay', response_fields)
@token.doc(security='Bearer')
@common.api_token_authorization
def post(self, email):
""" Create a new token for user"""
data = api.payload
if not validators.email(email):
return { 'code': 400, 'message': f'Provided email address {email} is not a valid email address'}, 400
user_found = models.User.query.get(email)
if not user_found:
return {'code': 404, 'message': f'User {email} cannot be found'}, 404
token_new = models.Token(user_email=email)
if 'comment' in data:
token_new.comment = data['comment']
if 'AuthorizedIP' in data:
token_new.ip = token_new.ip = data['AuthorizedIP'].replace(' ','').split(',')
raw_password = pwd.genword(entropy=128, length=32, charset="hex")
token_new.set_password(raw_password)
models.db.session.add(token_new)
#apply the changes
db.session.commit()
flask.current_app.logger.info(f'token_new.id == {token_new.id}.')
response_dict = {
'id' : token_new.id,
'token' : raw_password,
'email' : token_new.user_email,
'comment' : token_new.comment,
'AuthorizedIP' : token_new.ip,
'Created': str(token_new.created_at),
}
return response_dict
@token.route('/<string:token_id>')
class Token(Resource):
@token.doc('find_token')
@token.marshal_with(token_user_fields, as_list=True, skip_none=True, mask=None)
@token.doc(security='Bearer')
@common.api_token_authorization
def get(self, token_id):
"Find token"
token = models.Token.query.get(token_id)
if not token:
return { 'code' : 404, 'message' : f'Record cannot be found for id {token_id} or invalid id provided'}, 404
return token
@token.doc('delete_token')
@token.response(200, 'Success', response_fields)
@token.response(400, 'Input validation exception', response_fields)
@token.response(404, 'Token not found', response_fields)
@token.doc(security='Bearer')
@common.api_token_authorization
def delete(self, token_id):
""" Delete token """
token = models.Token.query.get(token_id)
if not token:
return { 'code' : 404, 'message' : f'Record cannot be found for id {token_id} or invalid id provided'}, 404
db.session.delete(token)
db.session.commit()
return {'code': 200, 'message': f'Token with id {token_id} has been deleted'}, 200