1
0
mirror of https://github.com/janeczku/calibre-web.git synced 2024-11-28 08:58:41 +02:00
calibre-web/cps/helper.py

936 lines
40 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
# Copyright (C) 2012-2019 cervinko, idalin, SiphonSquirrel, ouzklcn, akushsky,
# OzzieIsaacs, bodybybuddha, jkrehm, matthazinski, janeczku
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, print_function, unicode_literals
import sys
import os
import io
import json
import mimetypes
import random
import re
import shutil
import time
import unicodedata
from datetime import datetime, timedelta
from tempfile import gettempdir
import requests
from babel import Locale as LC
from babel.core import UnknownLocaleError
from babel.dates import format_datetime
from babel.units import format_unit
from flask import send_from_directory, make_response, redirect, abort
from flask_babel import gettext as _
2018-11-03 14:43:38 +02:00
from flask_login import current_user
from sqlalchemy.sql.expression import true, false, and_, or_, text, func
from werkzeug.datastructures import Headers
from werkzeug.security import generate_password_hash
2017-03-17 01:36:37 +02:00
try:
from urllib.parse import quote
2017-03-17 01:36:37 +02:00
except ImportError:
from urllib import quote
try:
import unidecode
use_unidecode = True
except ImportError:
use_unidecode = False
2019-04-17 20:45:08 +02:00
try:
from PIL import Image as PILImage
2019-04-17 20:45:08 +02:00
use_PIL = True
except ImportError:
use_PIL = False
from . import logger, config, get_locale, db, ub, isoLanguages, worker
from . import gdriveutils as gd
from .constants import STATIC_DIR as _STATIC_DIR
from .pagination import Pagination
from .subproc_wrapper import process_wait
from .worker import STAT_WAITING, STAT_FAIL, STAT_STARTED, STAT_FINISH_SUCCESS
from .worker import TASK_EMAIL, TASK_CONVERT, TASK_UPLOAD, TASK_CONVERT_ANY
log = logger.create()
# Convert existing book entry to new format
def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, user_id, kindle_mail=None):
book = db.session.query(db.Books).filter(db.Books.id == book_id).first()
data = db.session.query(db.Data).filter(db.Data.book == book.id).filter(db.Data.format == old_book_format).first()
2016-03-27 23:36:51 +02:00
if not data:
error_message = _(u"%(format)s format not found for book id: %(book)d", format=old_book_format, book=book_id)
log.error("convert_book_format: %s", error_message)
return error_message
if config.config_use_google_drive:
df = gd.getFileFromEbooksFolder(book.path, data.name + "." + old_book_format.lower())
if df:
datafile = os.path.join(calibrepath, book.path, data.name + u"." + old_book_format.lower())
if not os.path.exists(os.path.join(calibrepath, book.path)):
os.makedirs(os.path.join(calibrepath, book.path))
df.GetContentFile(datafile)
else:
error_message = _(u"%(format)s not found on Google Drive: %(fn)s",
format=old_book_format, fn=data.name + "." + old_book_format.lower())
return error_message
file_path = os.path.join(calibrepath, book.path, data.name)
if os.path.exists(file_path + "." + old_book_format.lower()):
# read settings and append converter task to queue
if kindle_mail:
settings = config.get_mail_settings()
2020-04-19 19:08:58 +02:00
settings['subject'] = _('Send to Kindle') # pretranslate Subject for e-mail
2018-11-03 14:43:38 +02:00
settings['body'] = _(u'This e-mail has been sent via Calibre-Web.')
# text = _(u"%(format)s: %(book)s", format=new_book_format, book=book.title)
else:
2018-09-06 20:54:48 +02:00
settings = dict()
2018-11-03 14:43:38 +02:00
text = (u"%s -> %s: %s" % (old_book_format, new_book_format, book.title))
2018-09-06 20:54:48 +02:00
settings['old_book_format'] = old_book_format
settings['new_book_format'] = new_book_format
worker.add_convert(file_path, book.id, user_id, text, settings, kindle_mail)
return None
else:
error_message = _(u"%(format)s not found: %(fn)s",
2020-04-19 19:08:58 +02:00
format=old_book_format, fn=data.name + "." + old_book_format.lower())
return error_message
def send_test_mail(kindle_mail, user_name):
worker.add_email(_(u'Calibre-Web test e-mail'), None, None,
config.get_mail_settings(), kindle_mail, user_name,
_(u"Test e-mail"), _(u'This e-mail has been sent via Calibre-Web.'))
return
# Send registration email or password reset email, depending on parameter resend (False means welcome email)
def send_registration_mail(e_mail, user_name, default_password, resend=False):
text = "Hello %s!\r\n" % user_name
if not resend:
text += "Your new account at Calibre-Web has been created. Thanks for joining us!\r\n"
text += "Please log in to your account using the following informations:\r\n"
2019-12-28 17:18:21 +02:00
text += "User name: %s\r\n" % user_name
text += "Password: %s\r\n" % default_password
text += "Don't forget to change your password after first login.\r\n"
text += "Sincerely\r\n\r\n"
text += "Your Calibre-Web team"
worker.add_email(_(u'Get Started with Calibre-Web'), None, None,
config.get_mail_settings(), e_mail, None,
_(u"Registration e-mail for user: %(name)s", name=user_name), text)
return
def check_send_to_kindle(entry):
"""
returns all available book formats for sending to Kindle
"""
if len(entry.data):
2020-04-19 19:08:58 +02:00
bookformats = list()
2020-05-02 10:18:01 +02:00
if not config.config_converterpath:
# no converter - only for mobi and pdf formats
for ele in iter(entry.data):
if 'MOBI' in ele.format:
2020-04-19 19:08:58 +02:00
bookformats.append({'format': 'Mobi',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Mobi')})
if 'PDF' in ele.format:
2020-04-19 19:08:58 +02:00
bookformats.append({'format': 'Pdf',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Pdf')})
if 'AZW' in ele.format:
2020-04-19 19:08:58 +02:00
bookformats.append({'format': 'Azw',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Azw')})
else:
formats = list()
for ele in iter(entry.data):
formats.append(ele.format)
if 'MOBI' in formats:
2020-04-19 19:08:58 +02:00
bookformats.append({'format': 'Mobi',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Mobi')})
if 'AZW' in formats:
2020-04-19 19:08:58 +02:00
bookformats.append({'format': 'Azw',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Azw')})
if 'PDF' in formats:
2020-04-19 19:08:58 +02:00
bookformats.append({'format': 'Pdf',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Pdf')})
2020-05-02 10:18:01 +02:00
if config.config_converterpath:
if 'EPUB' in formats and not 'MOBI' in formats:
2020-04-19 19:08:58 +02:00
bookformats.append({'format': 'Mobi',
'convert':1,
'text': _('Convert %(orig)s to %(format)s and send to Kindle',
orig='Epub',
format='Mobi')})
if 'AZW3' in formats and not 'MOBI' in formats:
2020-04-19 19:08:58 +02:00
bookformats.append({'format': 'Mobi',
'convert': 2,
'text': _('Convert %(orig)s to %(format)s and send to Kindle',
orig='Azw3',
format='Mobi')})
return bookformats
else:
log.error(u'Cannot find book entry %d', entry.id)
return None
# Check if a reader is existing for any of the book formats, if not, return empty list, otherwise return
# list with supported formats
def check_read_formats(entry):
2019-05-07 19:54:13 +02:00
EXTENSIONS_READER = {'TXT', 'PDF', 'EPUB', 'CBZ', 'CBT', 'CBR'}
bookformats = list()
if len(entry.data):
for ele in iter(entry.data):
2019-10-20 16:47:15 +02:00
if ele.format.upper() in EXTENSIONS_READER:
bookformats.append(ele.format.lower())
return bookformats
# Files are processed in the following order/priority:
# 1: If Mobi file is existing, it's directly send to kindle email,
# 2: If Epub file is existing, it's converted and send to kindle email,
# 3: If Pdf file is existing, it's directly send to kindle email
def send_mail(book_id, book_format, convert, kindle_mail, calibrepath, user_id):
"""Send email with attachments"""
book = db.session.query(db.Books).filter(db.Books.id == book_id).first()
if convert == 1:
2018-11-03 14:43:38 +02:00
# returns None if success, otherwise errormessage
return convert_book_format(book_id, calibrepath, u'epub', book_format.lower(), user_id, kindle_mail)
if convert == 2:
# returns None if success, otherwise errormessage
return convert_book_format(book_id, calibrepath, u'azw3', book_format.lower(), user_id, kindle_mail)
for entry in iter(book.data):
if entry.format.upper() == book_format.upper():
converted_file_name = entry.name + '.' + book_format.lower()
worker.add_email(_(u"Send to Kindle"), book.path, converted_file_name,
config.get_mail_settings(), kindle_mail, user_id,
_(u"E-mail: %(book)s", book=book.title), _(u'This e-mail has been sent via Calibre-Web.'))
return
return _(u"The requested file could not be read. Maybe wrong permissions?")
2016-04-03 23:52:32 +02:00
def get_valid_filename(value, replace_whitespace=True):
"""
Returns the given string converted to a string that can be used for a clean
filename. Limits num characters to 128 max.
"""
if value[-1:] == u'.':
value = value[:-1]+u'_'
2017-09-16 19:57:00 +02:00
value = value.replace("/", "_").replace(":", "_").strip('\0')
if use_unidecode:
value = (unidecode.unidecode(value)).strip()
else:
value = value.replace(u'§', u'SS')
value = value.replace(u'ß', u'ss')
value = unicodedata.normalize('NFKD', value)
re_slugify = re.compile(r'[\W\s-]', re.UNICODE)
if isinstance(value, str): # Python3 str, Python2 unicode
2017-03-06 07:42:00 +02:00
value = re_slugify.sub('', value).strip()
else:
2017-03-05 12:48:59 +02:00
value = unicode(re_slugify.sub('', value).strip())
2016-04-03 23:52:32 +02:00
if replace_whitespace:
# *+:\"/<>? are replaced by _
value = re.sub(r'[\*\+:\\\"/<>\?]+', u'_', value, flags=re.U)
2017-11-28 09:54:21 +02:00
# pipe has to be replaced with comma
value = re.sub(r'[\|]+', u',', value, flags=re.U)
value = value[:128]
if not value:
raise ValueError("Filename cannot be empty")
if sys.version_info.major == 3:
return value
else:
return value.decode('utf-8')
def get_sorted_author(value):
try:
2018-10-30 22:47:33 +02:00
if ',' not in value:
regexes = [r"^(JR|SR)\.?$", r"^I{1,3}\.?$", r"^IV\.?$"]
2018-10-30 22:47:33 +02:00
combined = "(" + ")|(".join(regexes) + ")"
value = value.split(" ")
if re.match(combined, value[-1].upper()):
value2 = value[-2] + ", " + " ".join(value[:-2]) + " " + value[-1]
elif len(value) == 1:
value2 = value[0]
else:
value2 = value[-1] + ", " + " ".join(value[:-1])
else:
2018-10-30 22:47:33 +02:00
value2 = value
except Exception as ex:
log.error("Sorting author %s failed: %s", value, ex)
value2 = value
return value2
# Deletes a book fro the local filestorage, returns True if deleting is successfull, otherwise false
def delete_book_file(book, calibrepath, book_format=None):
# check that path is 2 elements deep, check that target path has no subfolders
if book.path.count('/') == 1:
path = os.path.join(calibrepath, book.path)
if book_format:
for file in os.listdir(path):
if file.upper().endswith("."+book_format):
os.remove(os.path.join(path, file))
return True, None
else:
if os.path.isdir(path):
if len(next(os.walk(path))[1]):
log.error("Deleting book %s failed, path has subfolders: %s", book.id, book.path)
return False , _("Deleting book %(id)s failed, path has subfolders: %(path)s",
id=book.id,
path=book.path)
try:
for root, __, files in os.walk(path):
for f in files:
os.unlink(os.path.join(root, f))
shutil.rmtree(path)
except (IOError, OSError) as e:
log.error("Deleting book %s failed: %s", book.id, e)
return False, _("Deleting book %(id)s failed: %(message)s", id=book.id, message=e)
authorpath = os.path.join(calibrepath, os.path.split(book.path)[0])
if not os.listdir(authorpath):
try:
shutil.rmtree(authorpath)
except (IOError, OSError) as e:
log.error("Deleting authorpath for book %s failed: %s", book.id, e)
return True, None
else:
log.error("Deleting book %s failed, book path not valid: %s", book.id, book.path)
return True, _("Deleting book %(id)s, book path not valid: %(path)s",
id=book.id,
path=book.path)
def update_dir_structure_file(book_id, calibrepath, first_author):
localbook = db.session.query(db.Books).filter(db.Books.id == book_id).first()
path = os.path.join(calibrepath, localbook.path)
2017-03-30 21:17:18 +02:00
authordir = localbook.path.split('/')[0]
if first_author:
new_authordir = get_valid_filename(first_author)
else:
new_authordir = get_valid_filename(localbook.authors[0].name)
2017-03-30 21:17:18 +02:00
titledir = localbook.path.split('/')[1]
new_titledir = get_valid_filename(localbook.title) + " (" + str(book_id) + ")"
2017-04-03 20:05:55 +02:00
if titledir != new_titledir:
new_title_path = os.path.join(os.path.dirname(path), new_titledir)
try:
if not os.path.exists(new_title_path):
os.renames(path, new_title_path)
else:
log.info("Copying title: %s into existing: %s", path, new_title_path)
for dir_name, __, file_list in os.walk(path):
for file in file_list:
os.renames(os.path.join(dir_name, file),
os.path.join(new_title_path + dir_name[len(path):], file))
path = new_title_path
localbook.path = localbook.path.split('/')[0] + '/' + new_titledir
except OSError as ex:
log.error("Rename title from: %s to %s: %s", path, new_title_path, ex)
log.debug(ex, exc_info=True)
return _("Rename title from: '%(src)s' to '%(dest)s' failed with error: %(error)s",
src=path, dest=new_title_path, error=str(ex))
2016-04-03 23:52:32 +02:00
if authordir != new_authordir:
new_author_path = os.path.join(calibrepath, new_authordir, os.path.basename(path))
try:
os.renames(path, new_author_path)
localbook.path = new_authordir + '/' + localbook.path.split('/')[1]
except OSError as ex:
log.error("Rename author from: %s to %s: %s", path, new_author_path, ex)
log.debug(ex, exc_info=True)
return _("Rename author from: '%(src)s' to '%(dest)s' failed with error: %(error)s",
src=path, dest=new_author_path, error=str(ex))
# Rename all files from old names to new names
if authordir != new_authordir or titledir != new_titledir:
try:
new_name = get_valid_filename(localbook.title) + ' - ' + get_valid_filename(new_authordir)
path_name = os.path.join(calibrepath, new_authordir, os.path.basename(path))
for file_format in localbook.data:
os.renames(os.path.join(path_name, file_format.name + '.' + file_format.format.lower()),
os.path.join(path_name, new_name + '.' + file_format.format.lower()))
file_format.name = new_name
except OSError as ex:
log.error("Rename file in path %s to %s: %s", path, new_name, ex)
log.debug(ex, exc_info=True)
return _("Rename file in path '%(src)s' to '%(dest)s' failed with error: %(error)s",
src=path, dest=new_name, error=str(ex))
return False
def update_dir_structure_gdrive(book_id, first_author):
error = False
book = db.session.query(db.Books).filter(db.Books.id == book_id).first()
path = book.path
2017-04-04 19:05:09 +02:00
authordir = book.path.split('/')[0]
if first_author:
new_authordir = get_valid_filename(first_author)
else:
new_authordir = get_valid_filename(book.authors[0].name)
titledir = book.path.split('/')[1]
new_titledir = get_valid_filename(book.title) + u" (" + str(book_id) + u")"
2017-07-09 20:15:15 +02:00
if titledir != new_titledir:
gFile = gd.getFileFromEbooksFolder(os.path.dirname(book.path), titledir)
if gFile:
gFile['title'] = new_titledir
gFile.Upload()
book.path = book.path.split('/')[0] + u'/' + new_titledir
path = book.path
gd.updateDatabaseOnEdit(gFile['id'], book.path) # only child folder affected
else:
2020-04-19 19:08:58 +02:00
error = _(u'File %(file)s not found on Google Drive', file=book.path) # file not found
2017-07-11 16:13:33 +02:00
if authordir != new_authordir:
2019-02-20 19:04:55 +02:00
gFile = gd.getFileFromEbooksFolder(os.path.dirname(book.path), new_titledir)
if gFile:
gd.moveGdriveFolderRemote(gFile, new_authordir)
book.path = new_authordir + u'/' + book.path.split('/')[1]
path = book.path
gd.updateDatabaseOnEdit(gFile['id'], book.path)
else:
2020-04-19 19:08:58 +02:00
error = _(u'File %(file)s not found on Google Drive', file=authordir) # file not found
# Rename all files from old names to new names
if authordir != new_authordir or titledir != new_titledir:
new_name = get_valid_filename(book.title) + u' - ' + get_valid_filename(new_authordir)
for file_format in book.data:
gFile = gd.getFileFromEbooksFolder(path, file_format.name + u'.' + file_format.format.lower())
if not gFile:
error = _(u'File %(file)s not found on Google Drive', file=file_format.name) # file not found
break
gd.moveGdriveFileRemote(gFile, new_name + u'.' + file_format.format.lower())
file_format.name = new_name
return error
def delete_book_gdrive(book, book_format):
error = None
if book_format:
name = ''
for entry in book.data:
if entry.format.upper() == book_format:
name = entry.name + '.' + book_format
gFile = gd.getFileFromEbooksFolder(book.path, name)
else:
2020-04-19 19:08:58 +02:00
gFile = gd.getFileFromEbooksFolder(os.path.dirname(book.path), book.path.split('/')[1])
if gFile:
gd.deleteDatabaseEntry(gFile['id'])
gFile.Trash()
else:
2020-04-19 19:08:58 +02:00
error = _(u'Book path %(path)s not found on Google Drive', path=book.path) # file not found
return error is None, error
def reset_password(user_id):
existing_user = ub.session.query(ub.User).filter(ub.User.id == user_id).first()
if not existing_user:
return 0, None
password = generate_random_password()
existing_user.password = generate_password_hash(password)
2019-12-28 17:18:21 +02:00
if not config.get_mail_server_configured():
2020-04-19 19:08:58 +02:00
return 2, None
try:
ub.session.commit()
send_registration_mail(existing_user.email, existing_user.nickname, password, True)
2020-04-19 19:08:58 +02:00
return 1, existing_user.nickname
except Exception:
ub.session.rollback()
2020-04-19 19:08:58 +02:00
return 0, None
def generate_random_password():
s = "abcdefghijklmnopqrstuvwxyz01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%&*()?"
passlen = 8
2020-04-19 19:08:58 +02:00
return "".join(random.sample(s, passlen))
################################## External interface
2020-04-19 19:08:58 +02:00
def update_dir_stucture(book_id, calibrepath, first_author=None):
if config.config_use_google_drive:
return update_dir_structure_gdrive(book_id, first_author)
else:
return update_dir_structure_file(book_id, calibrepath, first_author)
def delete_book(book, calibrepath, book_format):
if config.config_use_google_drive:
return delete_book_gdrive(book, book_format)
else:
return delete_book_file(book, calibrepath, book_format)
2019-12-17 19:00:35 +02:00
def get_cover_on_failure(use_generic_cover):
if use_generic_cover:
return send_from_directory(_STATIC_DIR, "generic_cover.jpg")
else:
return None
2020-04-19 19:08:58 +02:00
def get_book_cover(book_id):
book = db.session.query(db.Books).filter(db.Books.id == book_id).filter(common_filters()).first()
2019-12-17 19:00:35 +02:00
return get_book_cover_internal(book, use_generic_cover_on_failure=True)
2020-04-19 19:08:58 +02:00
2019-12-17 19:00:35 +02:00
def get_book_cover_with_uuid(book_uuid,
2020-04-19 19:08:58 +02:00
use_generic_cover_on_failure=True):
2019-12-17 19:00:35 +02:00
book = db.session.query(db.Books).filter(db.Books.uuid == book_uuid).first()
return get_book_cover_internal(book, use_generic_cover_on_failure)
2020-04-19 19:08:58 +02:00
2019-12-17 19:00:35 +02:00
def get_book_cover_internal(book,
2020-04-19 19:08:58 +02:00
use_generic_cover_on_failure):
2019-12-17 19:00:35 +02:00
if book and book.has_cover:
if config.config_use_google_drive:
try:
if not gd.is_gdrive_ready():
2019-12-17 19:00:35 +02:00
return get_cover_on_failure(use_generic_cover_on_failure)
2020-04-19 19:08:58 +02:00
path = gd.get_cover_via_gdrive(book.path)
if path:
return redirect(path)
else:
log.error('%s/cover.jpg not found on Google Drive', book.path)
2019-12-17 19:00:35 +02:00
return get_cover_on_failure(use_generic_cover_on_failure)
except Exception as e:
log.exception(e)
# traceback.print_exc()
2019-12-17 19:00:35 +02:00
return get_cover_on_failure(use_generic_cover_on_failure)
else:
cover_file_path = os.path.join(config.config_calibre_dir, book.path)
if os.path.isfile(os.path.join(cover_file_path, "cover.jpg")):
return send_from_directory(cover_file_path, "cover.jpg")
else:
2019-12-17 19:00:35 +02:00
return get_cover_on_failure(use_generic_cover_on_failure)
else:
2019-12-17 19:00:35 +02:00
return get_cover_on_failure(use_generic_cover_on_failure)
# saves book cover from url
def save_cover_from_url(url, book_path):
img = requests.get(url, timeout=10) # ToDo: Error Handling
return save_cover(img, book_path)
def save_cover_from_filestorage(filepath, saved_filename, img):
if hasattr(img, '_content'):
f = open(os.path.join(filepath, saved_filename), "wb")
f.write(img._content)
f.close()
else:
# check if file path exists, otherwise create it, copy file to calibre path and delete temp file
if not os.path.exists(filepath):
try:
os.makedirs(filepath)
except OSError:
log.error(u"Failed to create path for cover")
return False, _(u"Failed to create path for cover")
try:
img.save(os.path.join(filepath, saved_filename))
2020-04-27 20:01:13 +02:00
except (IOError, OSError):
log.error(u"Cover-file is not a valid image file, or could not be stored")
return False, _(u"Cover-file is not a valid image file, or could not be stored")
return True, None
# saves book cover to gdrive or locally
def save_cover(img, book_path):
2019-03-26 08:31:00 +02:00
content_type = img.headers.get('content-type')
2019-04-17 20:45:08 +02:00
if use_PIL:
if content_type not in ('image/jpeg', 'image/png', 'image/webp'):
log.error("Only jpg/jpeg/png/webp files are supported as coverfile")
return False, _("Only jpg/jpeg/png/webp files are supported as coverfile")
2019-04-17 20:45:08 +02:00
# convert to jpg because calibre only supports jpg
if content_type in ('image/png', 'image/webp'):
2020-04-19 19:08:58 +02:00
if hasattr(img, 'stream'):
imgc = PILImage.open(img.stream)
2019-04-17 20:45:08 +02:00
else:
imgc = PILImage.open(io.BytesIO(img.content))
2019-04-17 20:45:08 +02:00
im = imgc.convert('RGB')
tmp_bytesio = io.BytesIO()
im.save(tmp_bytesio, format='JPEG')
img._content = tmp_bytesio.getvalue()
else:
2020-04-19 19:08:58 +02:00
if content_type not in 'image/jpeg':
log.error("Only jpg/jpeg files are supported as coverfile")
return False, _("Only jpg/jpeg files are supported as coverfile")
2019-03-26 08:31:00 +02:00
if config.config_use_google_drive:
tmpDir = gettempdir()
ret, message = save_cover_from_filestorage(tmpDir, "uploaded_cover.jpg", img)
if ret is True:
gd.uploadFileToEbooksFolder(os.path.join(book_path, 'cover.jpg'),
os.path.join(tmpDir, "uploaded_cover.jpg"))
log.info("Cover is saved on Google Drive")
return True, None
else:
return False, message
else:
return save_cover_from_filestorage(os.path.join(config.config_calibre_dir, book_path), "cover.jpg", img)
def do_download_file(book, book_format, client, data, headers):
if config.config_use_google_drive:
startTime = time.time()
df = gd.getFileFromEbooksFolder(book.path, data.name + "." + book_format)
log.debug('%s', time.time() - startTime)
if df:
return gd.do_gdrive_download(df, headers)
else:
abort(404)
else:
filename = os.path.join(config.config_calibre_dir, book.path)
if not os.path.isfile(os.path.join(filename, data.name + "." + book_format)):
# ToDo: improve error handling
log.error('File not found: %s', os.path.join(filename, data.name + "." + book_format))
if client == "kobo" and book_format == "epub":
filename = config.config_kepub_cache_dir
os.system('{0} "{1}" -o {2}'.format(
config.config_kepubify_path,
os.path.join(filename, data.name + "." + book_format),
filename))
book_format = "kepub.epub"
headers["Content-Disposition"] = headers["Content-Disposition"].replace(".epub", ".kepub.epub")
response = make_response(send_from_directory(filename, data.name + "." + book_format))
response.headers = headers
return response
##################################
2017-11-19 19:08:55 +02:00
def check_unrar(unrarLocation):
if not unrarLocation:
return
if not os.path.exists(unrarLocation):
2020-04-28 20:23:39 +02:00
return _('Unrar binary file not found')
try:
if sys.version_info < (3, 0):
unrarLocation = unrarLocation.encode(sys.getfilesystemencoding())
unrarLocation = [unrarLocation]
for lines in process_wait(unrarLocation):
value = re.search('UNRAR (.*) freeware', lines)
if value:
version = value.group(1)
log.debug("unrar version %s", version)
except OSError as err:
log.exception(err)
2020-04-28 20:23:39 +02:00
return _('Error excecuting UnRar')
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
2020-04-19 19:08:58 +02:00
if isinstance(obj, datetime):
return obj.isoformat()
2020-04-19 19:08:58 +02:00
if isinstance(obj, timedelta):
return {
'__type__': 'timedelta',
'days': obj.days,
'seconds': obj.seconds,
'microseconds': obj.microseconds,
}
# return obj.isoformat()
2020-04-19 19:08:58 +02:00
raise TypeError("Type %s not serializable" % type(obj))
# helper function for displaying the runtime of tasks
def format_runtime(runtime):
retVal = ""
if runtime.days:
retVal = format_unit(runtime.days, 'duration-day', length="long", locale=get_locale()) + ', '
mins, seconds = divmod(runtime.seconds, 60)
hours, minutes = divmod(mins, 60)
# ToDo: locale.number_symbols._data['timeSeparator'] -> localize time separator ?
if hours:
retVal += '{:d}:{:02d}:{:02d}s'.format(hours, minutes, seconds)
elif minutes:
retVal += '{:2d}:{:02d}s'.format(minutes, seconds)
else:
retVal += '{:2d}s'.format(seconds)
return retVal
2019-05-11 13:03:38 +02:00
# helper function to apply localize status information in tasklist entries
def render_task_status(tasklist):
2020-04-19 19:08:58 +02:00
renderedtasklist = list()
for task in tasklist:
2018-11-03 14:43:38 +02:00
if task['user'] == current_user.nickname or current_user.role_admin():
if task['formStarttime']:
task['starttime'] = format_datetime(task['formStarttime'], format='short', locale=get_locale())
# task2['formStarttime'] = ""
else:
2018-11-03 14:43:38 +02:00
if 'starttime' not in task:
task['starttime'] = ""
2019-07-11 20:37:03 +02:00
if 'formRuntime' not in task:
task['runtime'] = ""
else:
task['runtime'] = format_runtime(task['formRuntime'])
2018-11-03 14:43:38 +02:00
# localize the task status
2020-04-19 19:08:58 +02:00
if isinstance( task['stat'], int):
if task['stat'] == STAT_WAITING:
2018-11-03 14:43:38 +02:00
task['status'] = _(u'Waiting')
elif task['stat'] == STAT_FAIL:
2018-11-03 14:43:38 +02:00
task['status'] = _(u'Failed')
elif task['stat'] == STAT_STARTED:
2018-11-03 14:43:38 +02:00
task['status'] = _(u'Started')
elif task['stat'] == STAT_FINISH_SUCCESS:
2018-11-03 14:43:38 +02:00
task['status'] = _(u'Finished')
else:
task['status'] = _(u'Unknown Status')
# localize the task type
2020-04-19 19:08:58 +02:00
if isinstance( task['taskType'], int):
if task['taskType'] == TASK_EMAIL:
2018-11-03 14:43:38 +02:00
task['taskMessage'] = _(u'E-mail: ') + task['taskMess']
2020-04-19 19:08:58 +02:00
elif task['taskType'] == TASK_CONVERT:
2018-11-03 14:43:38 +02:00
task['taskMessage'] = _(u'Convert: ') + task['taskMess']
2020-04-19 19:08:58 +02:00
elif task['taskType'] == TASK_UPLOAD:
2018-11-03 14:43:38 +02:00
task['taskMessage'] = _(u'Upload: ') + task['taskMess']
2020-04-19 19:08:58 +02:00
elif task['taskType'] == TASK_CONVERT_ANY:
2018-11-03 14:43:38 +02:00
task['taskMessage'] = _(u'Convert: ') + task['taskMess']
else:
task['taskMessage'] = _(u'Unknown Task: ') + task['taskMess']
2018-11-03 14:43:38 +02:00
renderedtasklist.append(task)
return renderedtasklist
# Language and content filters for displaying in the UI
def common_filters(allow_show_archived=False):
if not allow_show_archived:
archived_books = (
ub.session.query(ub.ArchivedBook)
.filter(ub.ArchivedBook.user_id == int(current_user.id))
.filter(ub.ArchivedBook.is_archived == True)
.all()
)
archived_book_ids = [archived_book.book_id for archived_book in archived_books]
archived_filter = db.Books.id.notin_(archived_book_ids)
else:
archived_filter = true()
if current_user.filter_language() != "all":
lang_filter = db.Books.languages.any(db.Languages.lang_code == current_user.filter_language())
else:
lang_filter = true()
2020-02-15 11:21:45 +02:00
negtags_list = current_user.list_denied_tags()
postags_list = current_user.list_allowed_tags()
neg_content_tags_filter = false() if negtags_list == [''] else db.Books.tags.any(db.Tags.name.in_(negtags_list))
pos_content_tags_filter = true() if postags_list == [''] else db.Books.tags.any(db.Tags.name.in_(postags_list))
if config.config_restricted_column:
pos_cc_list = current_user.allowed_column_value.split(',')
pos_content_cc_filter = true() if pos_cc_list == [''] else \
getattr(db.Books, 'custom_column_' + str(config.config_restricted_column)).\
2020-04-19 19:08:58 +02:00
any(db.cc_classes[config.config_restricted_column].value.in_(pos_cc_list))
2020-02-15 11:21:45 +02:00
neg_cc_list = current_user.denied_column_value.split(',')
neg_content_cc_filter = false() if neg_cc_list == [''] else \
getattr(db.Books, 'custom_column_' + str(config.config_restricted_column)).\
2020-04-19 19:08:58 +02:00
any(db.cc_classes[config.config_restricted_column].value.in_(neg_cc_list))
else:
pos_content_cc_filter = true()
neg_content_cc_filter = false()
return and_(lang_filter, pos_content_tags_filter, ~neg_content_tags_filter,
pos_content_cc_filter, ~neg_content_cc_filter, archived_filter)
def tags_filters():
2020-02-15 11:21:45 +02:00
negtags_list = current_user.list_denied_tags()
postags_list = current_user.list_allowed_tags()
neg_content_tags_filter = false() if negtags_list == [''] else db.Tags.name.in_(negtags_list)
pos_content_tags_filter = true() if postags_list == [''] else db.Tags.name.in_(postags_list)
return and_(pos_content_tags_filter, ~neg_content_tags_filter)
# return ~(false()) if postags_list == [''] else db.Tags.in_(postags_list)
# Creates for all stored languages a translated speaking name in the array for the UI
def speaking_language(languages=None):
if not languages:
2020-04-19 19:08:58 +02:00
languages = db.session.query(db.Languages).join(db.books_languages_link).join(db.Books)\
.filter(common_filters())\
.group_by(text('books_languages_link.lang_code')).all()
for lang in languages:
try:
cur_l = LC.parse(lang.lang_code)
lang.name = cur_l.get_language_name(get_locale())
except UnknownLocaleError:
lang.name = _(isoLanguages.get(part3=lang.lang_code).name)
return languages
2020-04-19 19:08:58 +02:00
# checks if domain is in database (including wildcards)
# example SELECT * FROM @TABLE WHERE 'abcdefg' LIKE Name;
# from https://code.luasoftware.com/tutorials/flask/execute-raw-sql-in-flask-sqlalchemy/
def check_valid_domain(domain_text):
# domain_text = domain_text.split('@', 1)[-1].lower()
sql = "SELECT * FROM registration WHERE (:domain LIKE domain and allow = 1);"
result = ub.session.query(ub.Registration).from_statement(text(sql)).params(domain=domain_text).all()
if not len(result):
return False
sql = "SELECT * FROM registration WHERE (:domain LIKE domain and allow = 0);"
result = ub.session.query(ub.Registration).from_statement(text(sql)).params(domain=domain_text).all()
return not len(result)
# Orders all Authors in the list according to authors sort
def order_authors(entry):
sort_authors = entry.author_sort.split('&')
authors_ordered = list()
error = False
for auth in sort_authors:
# ToDo: How to handle not found authorname
result = db.session.query(db.Authors).filter(db.Authors.sort == auth.lstrip().strip()).first()
if not result:
error = True
break
authors_ordered.append(result)
if not error:
entry.authors = authors_ordered
return entry
# Fill indexpage with all requested data from database
def fill_indexpage(page, database, db_filter, order, *join):
return fill_indexpage_with_archived_books(page, database, db_filter, order, False, *join)
def fill_indexpage_with_archived_books(page, database, db_filter, order, allow_show_archived, *join):
if current_user.show_detail_random():
randm = db.session.query(db.Books).filter(common_filters(allow_show_archived))\
.order_by(func.random()).limit(config.config_random_books)
else:
randm = false()
off = int(int(config.config_books_per_page) * (page - 1))
pagination = Pagination(page, config.config_books_per_page,
2020-04-19 19:08:58 +02:00
len(db.session.query(database).filter(db_filter)
.filter(common_filters(allow_show_archived)).all()))
entries = db.session.query(database).join(*join, isouter=True).filter(db_filter)\
.filter(common_filters(allow_show_archived))\
.order_by(*order).offset(off).limit(config.config_books_per_page).all()
for book in entries:
book = order_authors(book)
return entries, randm, pagination
2020-04-19 19:08:58 +02:00
def get_typeahead(database, query, replace=('', ''), tag_filter=true()):
query = query or ''
db.session.connection().connection.connection.create_function("lower", 1, lcase)
2020-04-19 19:08:58 +02:00
entries = db.session.query(database).filter(tag_filter).\
filter(func.lower(database.name).ilike("%" + query + "%")).all()
json_dumps = json.dumps([dict(name=r.name.replace(*replace)) for r in entries])
return json_dumps
2020-04-19 19:08:58 +02:00
# read search results from calibre-database and return it (function is used for feed and simple search
def get_search_results(term):
db.session.connection().connection.connection.create_function("lower", 1, lcase)
q = list()
authorterms = re.split("[, ]+", term)
for authorterm in authorterms:
q.append(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + authorterm + "%")))
db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + term + "%"))
return db.session.query(db.Books).filter(common_filters()).filter(
or_(db.Books.tags.any(func.lower(db.Tags.name).ilike("%" + term + "%")),
db.Books.series.any(func.lower(db.Series.name).ilike("%" + term + "%")),
db.Books.authors.any(and_(*q)),
db.Books.publishers.any(func.lower(db.Publishers.name).ilike("%" + term + "%")),
func.lower(db.Books.title).ilike("%" + term + "%")
)).order_by(db.Books.sort).all()
2020-04-19 19:08:58 +02:00
def get_cc_columns(filter_config_custom_read=False):
tmpcc = db.session.query(db.Custom_Columns).filter(db.Custom_Columns.datatype.notin_(db.cc_exceptions)).all()
cc = []
r = None
if config.config_columns_to_ignore:
r = re.compile(config.config_columns_to_ignore)
for col in tmpcc:
if filter_config_custom_read and config.config_read_column and config.config_read_column == col.id:
continue
if r and r.match(col.label):
continue
cc.append(col)
return cc
def get_download_link(book_id, book_format, client):
book_format = book_format.split(".")[0]
book = db.session.query(db.Books).filter(db.Books.id == book_id).filter(common_filters()).first()
if book:
data1 = db.session.query(db.Data).filter(db.Data.book == book.id)\
.filter(db.Data.format == book_format.upper()).first()
else:
abort(404)
if data1:
# collect downloaded books only for registered user and not for anonymous user
if current_user.is_authenticated:
ub.update_download(book_id, int(current_user.id))
file_name = book.title
if len(book.authors) > 0:
file_name = book.authors[0].name + '_' + file_name
file_name = get_valid_filename(file_name)
headers = Headers()
headers["Content-Type"] = mimetypes.types_map.get('.' + book_format, "application/octet-stream")
headers["Content-Disposition"] = "attachment; filename=%s.%s; filename*=UTF-8''%s.%s" % (
quote(file_name.encode('utf-8')), book_format, quote(file_name.encode('utf-8')), book_format)
return do_download_file(book, book_format, client, data1, headers)
else:
abort(404)
2020-04-19 19:08:58 +02:00
def check_exists_book(authr, title):
db.session.connection().connection.connection.create_function("lower", 1, lcase)
q = list()
authorterms = re.split(r'\s*&\s*', authr)
for authorterm in authorterms:
q.append(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + authorterm + "%")))
return db.session.query(db.Books).filter(
and_(db.Books.authors.any(and_(*q)),
2020-04-19 19:08:58 +02:00
func.lower(db.Books.title).ilike("%" + title + "%")
)).first()
############### Database Helper functions
2020-04-19 19:08:58 +02:00
def lcase(s):
2019-09-27 15:30:39 +02:00
try:
return unidecode.unidecode(s.lower())
except Exception as e:
log.exception(e)