1
0
mirror of https://github.com/Mailu/Mailu.git synced 2024-12-14 10:53:30 +02:00
Mailu/admin/manage.py

177 lines
5.1 KiB
Python
Raw Normal View History

from mailu import manager, db
from mailu.admin import models
2016-06-25 14:02:50 +02:00
@manager.command
def flushdb():
""" Flush the database
"""
db.drop_all()
@manager.command
def initdb():
""" Initialize the database
"""
db.create_all()
@manager.command
def admin(localpart, domain_name, password):
""" Create an admin user
"""
domain = models.Domain.query.get(domain_name)
if not domain:
domain = models.Domain(name=domain_name)
db.session.add(domain)
2016-06-25 14:02:50 +02:00
user = models.User(
localpart=localpart,
domain=domain,
global_admin=True
2016-06-25 14:02:50 +02:00
)
user.set_password(password)
2016-06-25 14:02:50 +02:00
db.session.add(user)
db.session.commit()
2016-06-25 12:57:47 +02:00
2016-12-14 09:16:48 +02:00
@manager.command
2017-08-24 16:23:54 +02:00
def user(localpart, domain_name, password, hash_scheme='SHA512-CRYPT'):
""" Create a user
2016-12-14 09:16:48 +02:00
"""
domain = models.Domain.query.get(domain_name)
if not domain:
domain = models.Domain(name=domain_name)
db.session.add(domain)
user = models.User(
localpart=localpart,
domain=domain,
global_admin=False
2016-12-14 09:16:48 +02:00
)
2017-08-24 16:23:54 +02:00
user.set_password(password, hash_scheme=hash_scheme)
db.session.add(user)
db.session.commit()
@manager.command
def user_import(localpart, domain_name, password_hash, hash_scheme='SHA512-CRYPT'):
""" Import a user along with password hash. Available hashes:
'SHA512-CRYPT'
'SHA256-CRYPT'
'MD5-CRYPT'
'CRYPT'
2017-08-24 16:23:54 +02:00
"""
domain = models.Domain.query.get(domain_name)
if not domain:
domain = models.Domain(name=domain_name)
db.session.add(domain)
user = models.User(
localpart=localpart,
domain=domain,
global_admin=False
)
user.set_password(password_hash, hash_scheme=hash_scheme, raw=True)
2016-12-14 09:16:48 +02:00
db.session.add(user)
db.session.commit()
2016-06-25 12:57:47 +02:00
@manager.command
2017-08-25 22:07:07 +02:00
def config_update(delete_objects=False):
"""sync configuration with data from YAML-formatted stdin"""
import yaml, sys
new_config=yaml.load(sys.stdin)
# print new_config
users=new_config['users']
2017-08-25 22:07:07 +02:00
tracked_users=set()
for user_config in users:
localpart=user_config['localpart']
domain_name=user_config['domain']
password_hash=user_config['password_hash']
hash_scheme=user_config['hash_scheme']
domain = models.Domain.query.get(domain_name)
2017-08-25 22:07:07 +02:00
email='{0}@{1}'.format(localpart,domain_name)
if not domain:
domain = models.Domain(name=domain_name)
db.session.add(domain)
2017-08-25 22:07:07 +02:00
user = models.User.query.get(email)
tracked_users.add(email)
if not user:
user = models.User(
localpart=localpart,
domain=domain,
global_admin=False
)
user.set_password(password_hash, hash_scheme=hash_scheme, raw=True)
db.session.add(user)
aliases=new_config['aliases']
2017-08-25 22:07:07 +02:00
tracked_aliases=set()
for alias_config in aliases:
localpart=alias_config['localpart']
domain_name=alias_config['domain']
destination=alias_config['destination']
domain = models.Domain.query.get(domain_name)
2017-08-25 22:07:07 +02:00
email='{0}@{1}'.format(localpart,domain_name)
if not domain:
domain = models.Domain(name=domain_name)
db.session.add(domain)
2017-08-25 22:07:07 +02:00
alias = models.Alias.query.get(email)
tracked_aliases.add(email)
if not alias:
alias = models.Alias(
localpart=localpart,
domain=domain,
destination=destination.split(','),
2017-08-25 22:07:07 +02:00
email=email
)
else:
alias.destination = destination.split(',')
db.session.add(alias)
2017-08-25 22:07:07 +02:00
if delete_objects:
for user in db.session.query(models.User).all():
if not ( user.email in tracked_users ):
db.session.delete(user)
for alias in db.session.query(models.Alias).all():
if not ( alias.email in tracked_aliases ):
db.session.delete(alias)
db.session.commit()
2016-12-14 10:40:49 +02:00
@manager.command
def alias(localpart, domain_name, destination):
""" Create an alias
"""
domain = models.Domain.query.get(domain_name)
if not domain:
domain = models.Domain(name=domain_name)
db.session.add(domain)
alias = models.Alias(
2016-12-14 10:40:49 +02:00
localpart=localpart,
domain=domain,
destination=destination.split(','),
email="%s@%s" % (localpart, domain_name)
)
db.session.add(alias)
db.session.commit()
# Set limits to a domain
@manager.command
def setlimits(domain_name, max_users, max_aliases, max_quota_bytes):
domain = models.Domain.query.get(domain_name)
domain.max_users = max_users
domain.max_aliases = max_aliases
domain.max_quota_bytes = max_quota_bytes
db.session.add(domain)
db.session.commit()
# Make the user manager of a domain
@manager.command
def setmanager(domain_name, user_name='manager'):
domain = models.Domain.query.get(domain_name)
manageruser = models.User.query.get(user_name + '@' + domain_name)
domain.managers.append(manageruser)
db.session.add(domain)
db.session.commit()
2016-06-25 12:57:47 +02:00
if __name__ == "__main__":
manager.run()