1
0
mirror of https://github.com/Mailu/Mailu.git synced 2025-08-10 22:31:47 +02:00
3203: Add automatic tests for RESTful API r=mergify[bot] a=Diman0

and fix all remaining issues that I could find with the API.

## What type of PR?

internal feature / bug-fix

## What does this PR do?
I first wanted to finish #3113 before continuing on the tests to keep the scope smaller of the PR.

This PR adds automatic tests that tests **all** the interfaces of the RESTful API. Practically it only tests the normal Ok (http 200) situations. Maybe in the future we could add more tests to check if the validation checks work correctly for each interface.

I also fixed any issues I could find with the RESTful API. I can at least confirm that all interfaces work now. I think the validation checks are also complete now.

### Related issue(s)

## Prerequisites
Before we can consider review and merge, please make sure the following list is done and checked.
If an entry in not applicable, you can check it or remove it from the list.

- [n/a] In case of feature or enhancement: documentation updated accordingly
- [n/a] Unless it's docs or a minor change: add [changelog](https://mailu.io/master/contributors/workflow.html#changelog) entry file.


Co-authored-by: Dimitri Huisman <diman@huisman.xyz>
This commit is contained in:
bors-mailu[bot]
2024-04-17 18:09:31 +00:00
committed by GitHub
15 changed files with 1112 additions and 50 deletions

View File

@@ -2,6 +2,7 @@ from flask_restx import Resource, fields, marshal
from . import api, response_fields
from .. import common
from ... import models
import validators
db = models.db
@@ -15,7 +16,7 @@ alias_fields_update = alias.model('AliasUpdate', {
alias_fields = alias.inherit('Alias',alias_fields_update, {
'email': fields.String(description='the alias email address', example='user@example.com', required=True),
'destination': fields.List(fields.String(description='alias email address', example='user@example.com', required=True)),
'destination': fields.List(fields.String(description='destination email address', example='user@example.com', required=True)),
})
@@ -24,6 +25,7 @@ alias_fields = alias.inherit('Alias',alias_fields_update, {
class Aliases(Resource):
@alias.doc('list_alias')
@alias.marshal_with(alias_fields, as_list=True, skip_none=True, mask=None)
@alias.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alias.doc(security='Bearer')
@common.api_token_authorization
def get(self):
@@ -34,6 +36,8 @@ class Aliases(Resource):
@alias.expect(alias_fields)
@alias.response(200, 'Success', response_fields)
@alias.response(400, 'Input validation exception', response_fields)
@alias.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alias.response(404, 'Not found', response_fields)
@alias.response(409, 'Duplicate alias', response_fields)
@alias.doc(security='Bearer')
@common.api_token_authorization
@@ -41,6 +45,20 @@ class Aliases(Resource):
""" Create a new alias """
data = api.payload
if not validators.email(data['email']):
return { 'code': 400, 'message': f'Provided alias {data["email"]} is not a valid email address'}, 400
localpart, domain_name = data['email'].lower().rsplit('@', 1)
domain_found = models.Domain.query.get(domain_name)
if not domain_found:
return { 'code': 404, 'message': f'Domain {domain_name} does not exist ({data["email"]})'}, 404
if not domain_found.max_aliases == -1 and len(domain_found.aliases) >= domain_found.max_aliases:
return { 'code': 409, 'message': f'Too many aliases for domain {domain_name}'}, 409
for dest in data['destination']:
if not validators.email(dest):
return { 'code': 400, 'message': f'Provided destination email address {dest} is not a valid email address'}, 400
elif models.User.query.filter_by(email=dest).first() is None:
return { 'code': 404, 'message': f'Provided destination email address {dest} does not exist'}, 404
alias_found = models.Alias.query.filter_by(email = data['email']).first()
if alias_found:
return { 'code': 409, 'message': f'Duplicate alias {data["email"]}'}, 409
@@ -53,17 +71,21 @@ class Aliases(Resource):
db.session.add(alias_model)
db.session.commit()
return {'code': 200, 'message': f'Alias {data["email"]} to destination {data["destination"]} has been created'}, 200
return {'code': 200, 'message': f'Alias {data["email"]} to destination(s) {data["destination"]} has been created'}, 200
@alias.route('/<string:alias>')
class Alias(Resource):
@alias.doc('find_alias')
@alias.response(200, 'Success', alias_fields)
@alias.response(400, 'Input validation exception', response_fields)
@alias.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alias.response(404, 'Alias not found', response_fields)
@alias.doc(security='Bearer')
@common.api_token_authorization
def get(self, alias):
""" Look up the specified alias """
if not validators.email(alias):
return { 'code': 400, 'message': f'Provided alias (email address) {alias} is not a valid email address'}, 400
alias_found = models.Alias.query.filter_by(email = alias).first()
if alias_found is None:
return { 'code': 404, 'message': f'Alias {alias} cannot be found'}, 404
@@ -73,6 +95,7 @@ class Alias(Resource):
@alias.doc('update_alias')
@alias.expect(alias_fields_update)
@alias.response(200, 'Success', response_fields)
@alias.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alias.response(404, 'Alias not found', response_fields)
@alias.response(400, 'Input validation exception', response_fields)
@alias.doc(security='Bearer')
@@ -80,6 +103,9 @@ class Alias(Resource):
def patch(self, alias):
""" Update the specfied alias """
data = api.payload
if not validators.email(alias):
return { 'code': 400, 'message': f'Provided alias (email address) {alias} is not a valid email address'}, 400
alias_found = models.Alias.query.filter_by(email = alias).first()
if alias_found is None:
return { 'code': 404, 'message': f'Alias {alias} cannot be found'}, 404
@@ -87,6 +113,11 @@ class Alias(Resource):
alias_found.comment = data['comment']
if 'destination' in data:
alias_found.destination = data['destination']
for dest in data['destination']:
if not validators.email(dest):
return { 'code': 400, 'message': f'Provided destination email address {dest} is not a valid email address'}, 400
elif models.User.query.filter_by(email=dest).first() is None:
return { 'code': 404, 'message': f'Provided destination email address {dest} does not exist'}, 404
if 'wildcard' in data:
alias_found.wildcard = data['wildcard']
db.session.add(alias_found)
@@ -95,11 +126,15 @@ class Alias(Resource):
@alias.doc('delete_alias')
@alias.response(200, 'Success', response_fields)
@alias.response(400, 'Input validation exception', response_fields)
@alias.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alias.response(404, 'Alias not found', response_fields)
@alias.doc(security='Bearer')
@common.api_token_authorization
def delete(self, alias):
""" Delete the specified alias """
if not validators.email(alias):
return { 'code': 400, 'message': f'Provided alias (email address) {alias} is not a valid email address'}, 400
alias_found = models.Alias.query.filter_by(email = alias).first()
if alias_found is None:
return { 'code': 404, 'message': f'Alias {alias} cannot be found'}, 404
@@ -110,12 +145,16 @@ class Alias(Resource):
@alias.route('/destination/<string:domain>')
class AliasWithDest(Resource):
@alias.doc('find_alias_filter_domain')
@alias.marshal_with(alias_fields, code=200, description='Success' ,as_list=True, skip_none=True, mask=None)
@alias.response(200, 'Success', alias_fields)
@alias.response(400, 'Input validation exception', response_fields)
@alias.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alias.response(404, 'Alias or domain not found', response_fields)
@alias.doc(security='Bearer')
@common.api_token_authorization
def get(self, domain):
""" Look up the aliases of the specified domain """
if not validators.domain(domain):
return { 'code': 400, 'message': f'Domain {domain} is not a valid domain'}, 400
domain_found = models.Domain.query.filter_by(name=domain).first()
if domain_found is None:
return { 'code': 404, 'message': f'Domain {domain} cannot be found'}, 404
@@ -123,4 +162,4 @@ class AliasWithDest(Resource):
if aliases_found.count == 0:
return { 'code': 404, 'message': f'No alias can be found for domain {domain}'}, 404
else:
return marshal(aliases_found, alias_fields, as_list=True), 200
return marshal(aliases_found, alias_fields), 200

View File

@@ -16,7 +16,7 @@ domain_fields = api.model('Domain', {
'max_aliases': fields.Integer(description='maximum number of aliases', min=-1, default=-1),
'max_quota_bytes': fields.Integer(description='maximum quota for mailbox', min=0),
'signup_enabled': fields.Boolean(description='allow signup'),
'alternatives': fields.List(fields.String(attribute='name', description='FQDN', example='example2.com')),
'alternatives': fields.List(fields.String(attribute='name', description='FQDN'), example='["example.com"]'),
})
domain_fields_update = api.model('DomainUpdate', {
@@ -25,17 +25,18 @@ domain_fields_update = api.model('DomainUpdate', {
'max_aliases': fields.Integer(description='maximum number of aliases', min=-1, default=-1),
'max_quota_bytes': fields.Integer(description='maximum quota for mailbox', min=0),
'signup_enabled': fields.Boolean(description='allow signup'),
'alternatives': fields.List(fields.String(attribute='name', description='FQDN', example='example2.com')),
'alternatives': fields.List(fields.String(attribute='name', description='FQDN'), example='["example.com"]'),
})
domain_fields_get = api.model('DomainGet', {
'name': fields.String(description='FQDN (e.g. example.com)', example='example.com', required=True),
'comment': fields.String(description='a comment'),
'managers': fields.List(fields.String(attribute='email', description='manager of domain')),
'max_users': fields.Integer(description='maximum number of users', min=-1, default=-1),
'max_aliases': fields.Integer(description='maximum number of aliases', min=-1, default=-1),
'max_quota_bytes': fields.Integer(description='maximum quota for mailbox', min=0),
'signup_enabled': fields.Boolean(description='allow signup'),
'alternatives': fields.List(fields.String(attribute='name', description='FQDN', example='example2.com')),
'alternatives': fields.List(fields.String(attribute='name', description='FQDN'), example='["example.com"]'),
'dns_autoconfig': fields.List(fields.String(description='DNS client auto-configuration entry')),
'dns_mx': fields.String(Description='MX record for domain'),
'dns_spf': fields.String(Description='SPF record for domain'),
@@ -56,8 +57,7 @@ domain_fields_dns = api.model('DomainDNS', {
})
manager_fields = api.model('Manager', {
'domain_name': fields.String(description='domain managed by manager'),
'user_email': fields.String(description='email address of manager'),
'managers': fields.List(fields.String(attribute='email', description='manager of domain')),
})
manager_fields_create = api.model('ManagerCreate', {
@@ -78,6 +78,7 @@ alternative_fields = api.model('AlternativeDomain', {
class Domains(Resource):
@dom.doc('list_domain')
@dom.marshal_with(domain_fields_get, as_list=True, skip_none=True, mask=None)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.doc(security='Bearer')
@common.api_token_authorization
def get(self):
@@ -88,6 +89,7 @@ class Domains(Resource):
@dom.expect(domain_fields)
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(409, 'Duplicate domain/alternative name', response_fields)
@dom.doc(security='Bearer')
@common.api_token_authorization
@@ -131,15 +133,16 @@ class Domains(Resource):
class Domain(Resource):
@dom.doc('find_domain')
@dom.marshal_with(domain_fields, code=200, description='Success', as_list=False, skip_none=True, mask=None)
@dom.response(200, 'Success', domain_fields_get)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(404, 'Domain not found', response_fields)
@dom.doc(security='Bearer')
@common.api_token_authorization
def get(self, domain):
""" Look up the specified domain """
if not validators.domain(domain):
return { 'code': 400, 'message': f'Domain {domain} is not a valid domain'}, 400
return { 'code': 400, 'message': f'Domain {domain} is not a valid domain'}, 200
domain_found = models.Domain.query.get(domain)
if not domain_found:
return { 'code': 404, 'message': f'Domain {domain} does not exist'}, 404
@@ -149,6 +152,7 @@ class Domain(Resource):
@dom.expect(domain_fields_update)
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(404, 'Domain not found', response_fields)
@dom.response(409, 'Duplicate domain/alternative name', response_fields)
@dom.doc(security='Bearer')
@@ -158,7 +162,7 @@ class Domain(Resource):
if not validators.domain(domain):
return { 'code': 400, 'message': f'Domain {domain} is not a valid domain'}, 400
domain_found = models.Domain.query.get(domain)
if not domain:
if not domain_found:
return { 'code': 404, 'message': f'Domain {data["name"]} does not exist'}, 404
data = api.payload
@@ -172,7 +176,7 @@ class Domain(Resource):
if not validators.domain(item):
return { 'code': 400, 'message': f'Alternative domain {item} is not a valid domain'}, 400
for item in data['alternatives']:
alternative = models.Alternative(name=item, domain_name=data['name'])
alternative = models.Alternative(name=item, domain_name=domain)
models.db.session.add(alternative)
if 'comment' in data:
@@ -194,6 +198,7 @@ class Domain(Resource):
@dom.doc('delete_domain')
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(404, 'Domain not found', response_fields)
@dom.doc(security='Bearer')
@common.api_token_authorization
@@ -202,7 +207,7 @@ class Domain(Resource):
if not validators.domain(domain):
return { 'code': 400, 'message': f'Domain {domain} is not a valid domain'}, 400
domain_found = models.Domain.query.get(domain)
if not domain:
if not domain_found:
return { 'code': 404, 'message': f'Domain {domain} does not exist'}, 404
db.session.delete(domain_found)
db.session.commit()
@@ -213,6 +218,7 @@ class Domain(Resource):
@dom.doc('generate_dkim')
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(404, 'Domain not found', response_fields)
@dom.doc(security='Bearer')
@common.api_token_authorization
@@ -230,8 +236,9 @@ class Domain(Resource):
@dom.route('/<domain>/manager')
class Manager(Resource):
@dom.doc('list_managers')
@dom.marshal_with(manager_fields, code=200, description='Success', as_list=True, skip_none=True, mask=None)
@dom.response(200, 'Success', manager_fields)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(404, 'domain not found', response_fields)
@dom.doc(security='Bearer')
@common.api_token_authorization
@@ -239,15 +246,16 @@ class Manager(Resource):
""" List all managers of the specified domain """
if not validators.domain(domain):
return { 'code': 400, 'message': f'Domain {domain} is not a valid domain'}, 400
if not domain:
domain_found = models.Domain.query.get(domain)
if not domain_found:
return { 'code': 404, 'message': f'Domain {domain} does not exist'}, 404
domain = models.Domain.query.filter_by(name=domain)
return domain.managers
return marshal(domain_found, manager_fields), 200
@dom.doc('create_manager')
@dom.expect(manager_fields_create)
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(404, 'User or domain not found', response_fields)
@dom.response(409, 'Duplicate domain manager', response_fields)
@dom.doc(security='Bearer')
@@ -274,13 +282,14 @@ class Manager(Resource):
@dom.route('/<domain>/manager/<email>')
class Domain(Resource):
@dom.doc('find_manager')
@dom.marshal_with(manager_fields, code=200, description='Success', as_list=False, skip_none=True, mask=None)
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(404, 'Manager not found', response_fields)
@dom.doc(security='Bearer')
@common.api_token_authorization
def get(self, domain, email):
""" Look up the specified manager of the specified domain """
""" Check if the specified user is a manager of the specified domain """
if not validators.email(email):
return {'code': 400, 'message': f'Invalid email address {email}'}, 400
if not validators.domain(domain):
@@ -294,7 +303,7 @@ class Domain(Resource):
if user in domain.managers:
for manager in domain.managers:
if manager.email == email:
return marshal(manager, manager_fields),200
return { 'code': 200, 'message': f'User {email} is a manager of the domain {domain}'}, 200
else:
return { 'code': 404, 'message': f'User {email} is not a manager of the domain {domain}'}, 404
@@ -302,6 +311,7 @@ class Domain(Resource):
@dom.doc('delete_manager')
@dom.response(200, 'Success', response_fields)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(404, 'Manager not found', response_fields)
@dom.doc(security='Bearer')
@common.api_token_authorization
@@ -327,8 +337,9 @@ class Domain(Resource):
@dom.route('/<domain>/users')
class User(Resource):
@dom.doc('list_user_domain')
@dom.marshal_with(user.user_fields_get, code=200, description='Success', as_list=True, skip_none=True, mask=None)
@dom.response(200, 'Success', user.user_fields_get)
@dom.response(400, 'Input validation exception', response_fields)
@dom.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@dom.response(404, 'Domain not found', response_fields)
@dom.doc(security='Bearer')
@common.api_token_authorization
@@ -339,13 +350,14 @@ class User(Resource):
domain_found = models.Domain.query.get(domain)
if not domain_found:
return { 'code': 404, 'message': f'Domain {domain} does not exist'}, 404
return models.User.query.filter_by(domain=domain_found).all()
return marshal(models.User.query.filter_by(domain=domain_found).all(), user.user_fields_get),200
@alt.route('')
class Alternatives(Resource):
@alt.doc('list_alternative')
@alt.marshal_with(alternative_fields, as_list=True, skip_none=True, mask=None)
@alt.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alt.doc(security='Bearer')
@common.api_token_authorization
def get(self):
@@ -357,6 +369,7 @@ class Alternatives(Resource):
@alt.expect(alternative_fields)
@alt.response(200, 'Success', response_fields)
@alt.response(400, 'Input validation exception', response_fields)
@alt.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alt.response(404, 'Domain not found or missing', response_fields)
@alt.response(409, 'Duplicate alternative domain name', response_fields)
@alt.doc(security='Bearer')
@@ -383,8 +396,9 @@ class Alternatives(Resource):
class Alternative(Resource):
@alt.doc('find_alternative')
@alt.doc(security='Bearer')
@alt.marshal_with(alternative_fields, code=200, description='Success' ,as_list=True, skip_none=True, mask=None)
@alt.response(200, 'Success', alternative_fields)
@alt.response(400, 'Input validation exception', response_fields)
@alt.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alt.response(404, 'Alternative not found or missing', response_fields)
@common.api_token_authorization
def get(self, alt):
@@ -399,6 +413,7 @@ class Alternative(Resource):
@alt.doc('delete_alternative')
@alt.response(200, 'Success', response_fields)
@alt.response(400, 'Input validation exception', response_fields)
@alt.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@alt.response(404, 'Alternative/Domain not found or missing', response_fields)
@alt.response(409, 'Duplicate domain name', response_fields)
@alt.doc(security='Bearer')

View File

@@ -24,6 +24,7 @@ relay_fields_update = api.model('RelayUpdate', {
class Relays(Resource):
@relay.doc('list_relays')
@relay.marshal_with(relay_fields, as_list=True, skip_none=True, mask=None)
@relay.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@relay.doc(security='Bearer')
@common.api_token_authorization
def get(self):
@@ -34,6 +35,7 @@ class Relays(Resource):
@relay.expect(relay_fields)
@relay.response(200, 'Success', response_fields)
@relay.response(400, 'Input validation exception', response_fields)
@relay.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@relay.response(409, 'Duplicate relay', response_fields)
@relay.doc(security='Bearer')
@common.api_token_authorization
@@ -58,8 +60,9 @@ class Relays(Resource):
@relay.route('/<string:name>')
class Relay(Resource):
@relay.doc('find_relay')
@relay.marshal_with(relay_fields, code=200, description='Success', as_list=False, skip_none=True, mask=None)
@relay.response(200, 'Success', relay_fields)
@relay.response(400, 'Input validation exception', response_fields)
@relay.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@relay.response(404, 'Relay not found', response_fields)
@relay.doc(security='Bearer')
@common.api_token_authorization
@@ -77,6 +80,7 @@ class Relay(Resource):
@relay.expect(relay_fields_update)
@relay.response(200, 'Success', response_fields)
@relay.response(400, 'Input validation exception', response_fields)
@relay.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@relay.response(404, 'Relay not found', response_fields)
@relay.doc(security='Bearer')
@common.api_token_authorization
@@ -103,6 +107,7 @@ class Relay(Resource):
@relay.doc('delete_relay')
@relay.response(200, 'Success', response_fields)
@relay.response(400, 'Input validation exception', response_fields)
@relay.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@relay.response(404, 'Relay not found', response_fields)
@relay.doc(security='Bearer')
@common.api_token_authorization

View File

@@ -15,20 +15,20 @@ token_user_fields = api.model('TokenGetResponse', {
'id': fields.String(description='The record id of the token (unique identifier)', example='1'),
'email': fields.String(description='The email address of the user', example='John.Doe@example.com', attribute='user_email'),
'comment': fields.String(description='A description for the token. This description is shown on the Authentication tokens page', example='my comment'),
'AuthorizedIP': fields.String(description='Comma separated list of white listed IP addresses or networks that may use this token.', example="['203.0.113.0/24']", attribute='ip'),
'AuthorizedIP': fields.List(fields.String(description='White listed IP addresses or networks that may use this token.', example="203.0.113.0/24"), attribute='ip'),
'Created': fields.String(description='The date when the token was created', example='John.Doe@example.com', attribute='created_at'),
'Last edit': fields.String(description='The date when the token was last modifified', example='John.Doe@example.com', attribute='updated_at')
})
token_user_fields_post = api.model('TokenPost', {
'email': fields.String(description='The email address of the user', example='John.Doe@example.com', attribute='user_email'),
'email': fields.String(description='The email address of the user', example='John.Doe@example.com', attribute='user_email', required=True),
'comment': fields.String(description='A description for the token. This description is shown on the Authentication tokens page', example='my comment'),
'AuthorizedIP': fields.String(description='Comma separated list of white listed IP addresses or networks that may use this token.', example="['203.0.113.0/24']", attribute='ip'),
'AuthorizedIP': fields.List(fields.String(description='White listed IP addresses or networks that may use this token.', example="203.0.113.0/24")),
})
token_user_fields_post2 = api.model('TokenPost2', {
'comment': fields.String(description='A description for the token. This description is shown on the Authentication tokens page', example='my comment'),
'AuthorizedIP': fields.String(description='Comma separated list of white listed IP addresses or networks that may use this token.', example="['203.0.113.0/24']", attribute='ip'),
'AuthorizedIP': fields.List(fields.String(description='White listed IP addresses or networks that may use this token.', example="203.0.113.0/24")),
})
token_user_post_response = api.model('TokenPostResponse', {
@@ -36,14 +36,17 @@ token_user_post_response = api.model('TokenPostResponse', {
'token': fields.String(description='The created authentication token for the user.', example='2caf6607de5129e4748a2c061aee56f2', attribute='password'),
'email': fields.String(description='The email address of the user', example='John.Doe@example.com', attribute='user_email'),
'comment': fields.String(description='A description for the token. This description is shown on the Authentication tokens page', example='my comment'),
'AuthorizedIP': fields.String(description='Comma separated list of white listed IP addresses or networks that may use this token.', example="['203.0.113.0/24']", attribute='ip'),
'AuthorizedIP': fields.List(fields.String(description='White listed IP addresses or networks that may use this token.', example="203.0.113.0/24")),
'Created': fields.String(description='The date when the token was created', example='John.Doe@example.com', attribute='created_at')
})
@token.route('')
class Tokens(Resource):
@token.doc('list_tokens')
@token.marshal_with(token_user_fields, as_list=True, skip_none=True, mask=None)
@token.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@token.doc(security='Bearer')
@common.api_token_authorization
def get(self):
@@ -52,8 +55,10 @@ class Tokens(Resource):
@token.doc('create_token')
@token.expect(token_user_fields_post)
@token.marshal_with(token_user_post_response, code=200, description='Success', as_list=False, skip_none=True, mask=None)
@token.response(200, 'Success', token_user_post_response)
@token.response(400, 'Input validation exception', response_fields)
@token.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@token.response(404, 'User not found', response_fields)
@token.doc(security='Bearer')
@common.api_token_authorization
def post(self):
@@ -71,7 +76,11 @@ class Tokens(Resource):
if 'comment' in data:
token_new.comment = data['comment']
if 'AuthorizedIP' in data:
token_new.ip = data['AuthorizedIP'].replace(' ','').split(',')
token_new.ip = data['AuthorizedIP']
for ip in token_new.ip:
if (not validators.ip_address.ipv4(ip,cidr=True, strict=False, host_bit=False) and
not validators.ip_address.ipv6(ip,cidr=True, strict=False, host_bit=False)):
return { 'code': 400, 'message': f'Provided AuthorizedIP {ip} in {token_new.ip} is invalid'}, 400
raw_password = pwd.genword(entropy=128, length=32, charset="hex")
token_new.set_password(raw_password)
models.db.session.add(token_new)
@@ -91,8 +100,9 @@ class Tokens(Resource):
@token.route('user/<string:email>')
class Token(Resource):
@token.doc('find_tokens_of_user')
@token.marshal_with(token_user_fields, code=200, description='Success', as_list=True, skip_none=True, mask=None)
@token.response(200, 'Success', token_user_fields)
@token.response(400, 'Input validation exception', response_fields)
@token.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@token.response(404, 'Token not found', response_fields)
@token.doc(security='Bearer')
@common.api_token_authorization
@@ -104,12 +114,25 @@ class Token(Resource):
if not user_found:
return {'code': 404, 'message': f'User {email} cannot be found'}, 404
tokens = user_found.tokens
return tokens
response_list = []
for token in tokens:
response_dict = {
'id' : token.id,
'email' : token.user_email,
'comment' : token.comment,
'AuthorizedIP' : token.ip,
'Created': str(token.created_at),
'Last edit': str(token.updated_at)
}
response_list.append(response_dict)
return response_list
@token.doc('create_token')
@token.expect(token_user_fields_post2)
@token.response(200, 'Success', token_user_post_response)
@token.response(400, 'Input validation exception', response_fields)
@token.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@token.response(404, 'User not found', response_fields)
@token.doc(security='Bearer')
@common.api_token_authorization
def post(self, email):
@@ -125,7 +148,11 @@ class Token(Resource):
if 'comment' in data:
token_new.comment = data['comment']
if 'AuthorizedIP' in data:
token_new.ip = token_new.ip = data['AuthorizedIP'].replace(' ','').split(',')
token_new.ip = token_new.ip = data['AuthorizedIP']
for ip in token_new.ip:
if (not validators.ip_address.ipv4(ip,cidr=True, strict=False, host_bit=False) and
not validators.ip_address.ipv6(ip,cidr=True, strict=False, host_bit=False)):
return { 'code': 400, 'message': f'Provided AuthorizedIP {ip} in {token_new.ip} is invalid'}, 400
raw_password = pwd.genword(entropy=128, length=32, charset="hex")
token_new.set_password(raw_password)
models.db.session.add(token_new)
@@ -144,7 +171,8 @@ class Token(Resource):
@token.route('/<string:token_id>')
class Token(Resource):
@token.doc('find_token')
@token.marshal_with(token_user_fields, code=200, description='Success', as_list=False, skip_none=True, mask=None)
@token.response(200, 'Success', token_user_fields)
@token.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@token.response(404, 'Token not found', response_fields)
@token.doc(security='Bearer')
@common.api_token_authorization
@@ -153,11 +181,48 @@ class Token(Resource):
token = models.Token.query.get(token_id)
if not token:
return { 'code' : 404, 'message' : f'Record cannot be found for id {token_id} or invalid id provided'}, 404
return token
response_dict = {
'id' : token.id,
'email' : token.user_email,
'comment' : token.comment,
'AuthorizedIP' : token.ip,
'Created': str(token.created_at),
'Last edit': str(token.updated_at)
}
return response_dict
@token.doc('update_token')
@token.expect(token_user_fields_post2)
@token.response(200, 'Success', response_fields)
@token.response(400, 'Input validation exception', response_fields)
@token.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@token.response(404, 'User not found', response_fields)
@token.doc(security='Bearer')
def patch(self, token_id):
""" Update the specified token """
data = api.payload
token = models.Token.query.get(token_id)
if not token:
return { 'code' : 404, 'message' : f'Record cannot be found for id {token_id} or invalid id provided'}, 404
if 'comment' in data:
token.comment = data['comment']
if 'AuthorizedIP' in data:
token.ip = token.ip = data['AuthorizedIP']
for ip in token.ip:
if (not validators.ip_address.ipv4(ip,cidr=True, strict=False, host_bit=False) and
not validators.ip_address.ipv6(ip,cidr=True, strict=False, host_bit=False)):
return { 'code': 400, 'message': f'Provided AuthorizedIP {ip} in {token.ip} is invalid'}, 400
models.db.session.add(token)
#apply the changes
db.session.commit()
return {'code': 200, 'message': f'Token with id {token_id} has been updated'}, 200
@token.doc('delete_token')
@token.response(200, 'Success', response_fields)
@token.response(400, 'Input validation exception', response_fields)
@token.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@token.response(404, 'Token not found', response_fields)
@token.doc(security='Bearer')
@common.api_token_authorization

View File

@@ -22,7 +22,7 @@ user_fields_get = api.model('UserGet', {
'enable_pop': fields.Boolean(description='Allow email retrieval via POP3'),
'allow_spoofing': fields.Boolean(description='Allow the user to spoof the sender (send email as anyone)'),
'forward_enabled': fields.Boolean(description='Enable auto forwarding'),
'forward_destination': fields.List(fields.String(description='Email address to forward emails to'), example='Other@example.com'),
'forward_destination': fields.List(fields.String(description='Email address to forward emails to'), example='["Other@example.com"]'),
'forward_keep': fields.Boolean(description='Keep a copy of the forwarded email in the inbox'),
'reply_enabled': fields.Boolean(description='Enable automatic replies. This is also known as out of office (ooo) or out of facility (oof) replies'),
'reply_subject': fields.String(description='Optional subject for the automatic reply', example='Out of office'),
@@ -47,7 +47,7 @@ user_fields_post = api.model('UserCreate', {
'enable_pop': fields.Boolean(description='Allow email retrieval via POP3'),
'allow_spoofing': fields.Boolean(description='Allow the user to spoof the sender (send email as anyone)'),
'forward_enabled': fields.Boolean(description='Enable auto forwarding'),
'forward_destination': fields.List(fields.String(description='Email address to forward emails to'), example='Other@example.com'),
'forward_destination': fields.List(fields.String(description='Email address to forward emails to'), example='["Other@example.com"]'),
'forward_keep': fields.Boolean(description='Keep a copy of the forwarded email in the inbox'),
'reply_enabled': fields.Boolean(description='Enable automatic replies. This is also known as out of office (ooo) or out of facility (oof) replies'),
'reply_subject': fields.String(description='Optional subject for the automatic reply', example='Out of office'),
@@ -71,7 +71,7 @@ user_fields_put = api.model('UserUpdate', {
'enable_pop': fields.Boolean(description='Allow email retrieval via POP3'),
'allow_spoofing': fields.Boolean(description='Allow the user to spoof the sender (send email as anyone)'),
'forward_enabled': fields.Boolean(description='Enable auto forwarding'),
'forward_destination': fields.List(fields.String(description='Email address to forward emails to'), example='Other@example.com'),
'forward_destination': fields.List(fields.String(description='Email address to forward emails to'), example='["Other@example.com"]'),
'forward_keep': fields.Boolean(description='Keep a copy of the forwarded email in the inbox'),
'reply_enabled': fields.Boolean(description='Enable automatic replies. This is also known as out of office (ooo) or out of facility (oof) replies'),
'reply_subject': fields.String(description='Optional subject for the automatic reply', example='Out of office'),
@@ -89,6 +89,7 @@ user_fields_put = api.model('UserUpdate', {
class Users(Resource):
@user.doc('list_user')
@user.marshal_with(user_fields_get, as_list=True, skip_none=True, mask=None)
@user.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@user.doc(security='Bearer')
@common.api_token_authorization
def get(self):
@@ -99,6 +100,7 @@ class Users(Resource):
@user.expect(user_fields_post)
@user.response(200, 'Success', response_fields)
@user.response(400, 'Input validation exception', response_fields)
@user.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@user.response(409, 'Duplicate user', response_fields)
@user.doc(security='Bearer')
@common.api_token_authorization
@@ -107,14 +109,22 @@ class Users(Resource):
data = api.payload
if not validators.email(data['email']):
return { 'code': 400, 'message': f'Provided email address {data["email"]} is not a valid email address'}, 400
if 'forward_destination' in data and len(data['forward_destination']) > 0:
for dest in data['forward_destination']:
if not validators.email(dest):
return { 'code': 400, 'message': f'Provided forward destination email address {dest} is not a valid email address'}, 400
localpart, domain_name = data['email'].lower().rsplit('@', 1)
domain_found = models.Domain.query.get(domain_name)
if not domain_found:
return { 'code': 404, 'message': f'Domain {domain_name} does not exist'}, 404
if not domain_found.max_users == -1 and len(domain_found.users) >= domain_found.max_users:
return { 'code': 409, 'message': f'Too many users for domain {domain_name}'}, 409
email_found = models.User.query.filter_by(email=data['email']).first()
if email_found:
return { 'code': 409, 'message': f'User {data["email"]} already exists'}, 409
if 'forward_enabled' in data and data['forward_enabled'] is True:
if ('forward_destination' in data and len(data['forward_destination']) == 0) or 'forward_destination' not in data:
return { 'code': 400, 'message': f'forward_destination is mandatory when forward_enabled is true'}, 400
user_new = models.User(email=data['email'])
if 'raw_password' in data:
@@ -137,7 +147,7 @@ class Users(Resource):
user_new.allow_spoofing = data['allow_spoofing']
if 'forward_enabled' in data:
user_new.forward_enabled = data['forward_enabled']
if 'forward_destination' in data:
if 'forward_destination' in data and len(data['forward_destination']) > 0:
user_new.forward_destination = data['forward_destination']
if 'forward_keep' in data:
user_new.forward_keep = data['forward_keep']
@@ -168,12 +178,12 @@ class Users(Resource):
return {'code': 200,'message': f'User {data["email"]} has been created'}, 200
@user.route('/<string:email>')
class User(Resource):
@user.doc('find_user')
@user.marshal_with(user_fields_get, code=200, description='Success', as_list=False, skip_none=True, mask=None)
@user.response(200, 'Success', user_fields_get)
@user.response(400, 'Input validation exception', response_fields)
@user.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@user.response(404, 'User not found', response_fields)
@user.doc(security='Bearer')
@common.api_token_authorization
@@ -191,6 +201,7 @@ class User(Resource):
@user.expect(user_fields_put)
@user.response(200, 'Success', response_fields)
@user.response(400, 'Input validation exception', response_fields)
@user.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@user.response(404, 'User not found', response_fields)
@user.doc(security='Bearer')
@common.api_token_authorization
@@ -198,10 +209,17 @@ class User(Resource):
""" Update the specified user """
data = api.payload
if not validators.email(email):
return { 'code': 400, 'message': f'Provided email address {data["email"]} is not a valid email address'}, 400
return { 'code': 400, 'message': f'Provided email address {email} is not a valid email address'}, 400
if 'forward_destination' in data and len(data['forward_destination']) > 0:
for dest in data['forward_destination']:
if not validators.email(dest):
return { 'code': 400, 'message': f'Provided forward destination email address {dest} is not a valid email address'}, 400
user_found = models.User.query.get(email)
if not user_found:
return {'code': 404, 'message': f'User {email} cannot be found'}, 404
if ('forward_enabled' in data and data['forward_enabled'] is True) or ('forward_enabled' not in data and user_found.forward_enabled):
if ('forward_destination' in data and len(data['forward_destination']) == 0):
return { 'code': 400, 'message': f'forward_destination is mandatory when forward_enabled is true'}, 400
if 'raw_password' in data:
user_found.set_password(data['raw_password'])
@@ -223,8 +241,9 @@ class User(Resource):
user_found.allow_spoofing = data['allow_spoofing']
if 'forward_enabled' in data:
user_found.forward_enabled = data['forward_enabled']
if 'forward_destination' in data:
user_found.forward_destination = data['forward_destination']
if 'forward_destination' in data and len(data['forward_destination']) > 0:
if len(data['forward_destination']) == 0:
user_found.forward_destination = data['forward_destination']
if 'forward_keep' in data:
user_found.forward_keep = data['forward_keep']
if 'reply_enabled' in data:
@@ -258,6 +277,7 @@ class User(Resource):
@user.doc('delete_user')
@user.response(200, 'Success', response_fields)
@user.response(400, 'Input validation exception', response_fields)
@user.doc(responses={401: 'Authorization header missing', 403: 'Invalid authorization header'})
@user.response(404, 'User not found', response_fields)
@user.doc(security='Bearer')
@common.api_token_authorization

View File

@@ -93,7 +93,12 @@ def user_settings(user_email):
form = forms.UserSettingsForm(obj=user)
utils.formatCSVField(form.forward_destination)
if form.validate_on_submit():
form.forward_destination.data = form.forward_destination.data.replace(" ","").split(",")
if form.forward_enabled.data and (form.forward_destination.data in ['', None] or type(form.forward_destination.data) is list):
flask.flash('Destination email address is missing', 'error')
user.forward_enabled = True
return flask.render_template('user/settings.html', form=form, user=user)
if form.forward_enabled.data:
form.forward_destination.data = form.forward_destination.data.replace(" ","").split(",")
form.populate_obj(user)
models.db.session.commit()
form.forward_destination.data = ", ".join(form.forward_destination.data)
@@ -101,6 +106,9 @@ def user_settings(user_email):
if user_email:
return flask.redirect(
flask.url_for('.user_list', domain_name=user.domain.name))
elif form.is_submitted() and not form.validate():
user.forward_enabled = form.forward_enabled.data
return flask.render_template('user/settings.html', form=form, user=user)
return flask.render_template('user/settings.html', form=form, user=user)
def _process_password_change(form, user_email):