mirror of
https://github.com/janeczku/calibre-web.git
synced 2025-01-10 04:19:00 +02:00
93b19165cf
Handling of missing tags in fb import naming of path is more imitating calibre (replacement of special characters, "pinyining" of author names if unidecode is available ) Sorting of authors (similar to calibre for jr./sr./I..IV endings) bugfix pathseparator on windows and linux during upload bugfix os.rename for authordir publishing date on detailview is formated according to slected locale filename on downloading from web ui is now correct displayed added ids to html for testing
384 lines
14 KiB
Python
Executable File
384 lines
14 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import db
|
|
import ub
|
|
from flask import current_app as app
|
|
import logging
|
|
import smtplib
|
|
import tempfile
|
|
import socket
|
|
import sys
|
|
import os
|
|
import traceback
|
|
import re
|
|
import unicodedata
|
|
from StringIO import StringIO
|
|
from email import encoders
|
|
from email.MIMEBase import MIMEBase
|
|
from email.MIMEMultipart import MIMEMultipart
|
|
from email.MIMEText import MIMEText
|
|
from email.generator import Generator
|
|
from flask_babel import gettext as _
|
|
import subprocess
|
|
import shutil
|
|
try:
|
|
import unidecode
|
|
use_unidecode=True
|
|
except:
|
|
use_unidecode=False
|
|
|
|
def update_download(book_id, user_id):
|
|
check = ub.session.query(ub.Downloads).filter(ub.Downloads.user_id == user_id).filter(ub.Downloads.book_id ==
|
|
book_id).first()
|
|
|
|
if not check:
|
|
new_download = ub.Downloads(user_id=user_id, book_id=book_id)
|
|
ub.session.add(new_download)
|
|
ub.session.commit()
|
|
|
|
|
|
def make_mobi(book_id, calibrepath):
|
|
vendorpath = os.path.join(os.path.normpath(os.path.dirname(os.path.realpath(__file__)) +
|
|
os.sep + "../vendor" + os.sep))
|
|
if sys.platform == "win32":
|
|
kindlegen = os.path.join(vendorpath, u"kindlegen.exe")
|
|
else:
|
|
kindlegen = os.path.join(vendorpath, u"kindlegen")
|
|
if not os.path.exists(kindlegen):
|
|
app.logger.error("make_mobi: kindlegen binary not found in: %s" % kindlegen)
|
|
return None
|
|
book = db.session.query(db.Books).filter(db.Books.id == book_id).first()
|
|
data = db.session.query(db.Data).filter(db.Data.book == book.id).filter(db.Data.format == 'EPUB').first()
|
|
if not data:
|
|
app.logger.error("make_mobi: epub format not found for book id: %d" % book_id)
|
|
return None
|
|
|
|
file_path = os.path.join(calibrepath, book.path, data.name)
|
|
if os.path.exists(file_path + u".epub"):
|
|
p = subprocess.Popen((kindlegen + " \"" + file_path + u".epub\" ").encode(sys.getfilesystemencoding()),
|
|
shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
|
|
# Poll process for new output until finished
|
|
while True:
|
|
nextline = p.stdout.readline()
|
|
if nextline == '' and p.poll() is not None:
|
|
break
|
|
if nextline != "\r\n":
|
|
app.logger.debug(nextline.strip('\r\n'))
|
|
|
|
check = p.returncode
|
|
if not check or check < 2:
|
|
book.data.append(db.Data(
|
|
name=book.data[0].name,
|
|
format="MOBI",
|
|
book=book.id,
|
|
uncompressed_size=os.path.getsize(file_path + ".mobi")
|
|
))
|
|
db.session.commit()
|
|
return file_path + ".mobi"
|
|
else:
|
|
app.logger.error("make_mobi: kindlegen failed with error while converting book")
|
|
return None
|
|
else:
|
|
app.logger.error("make_mobie: epub not found: %s.epub" % file_path)
|
|
return None
|
|
|
|
|
|
class StderrLogger(object):
|
|
|
|
buffer = ''
|
|
|
|
def __init__(self):
|
|
self.logger = logging.getLogger('cps.web')
|
|
|
|
def write(self, message):
|
|
if message == '\n':
|
|
self.logger.debug(self.buffer)
|
|
self.buffer = ''
|
|
else:
|
|
self.buffer += message
|
|
|
|
|
|
def send_raw_email(kindle_mail, msg):
|
|
settings = ub.get_mail_settings()
|
|
|
|
msg['From'] = settings["mail_from"]
|
|
msg['To'] = kindle_mail
|
|
|
|
use_ssl = int(settings.get('mail_use_ssl', 0))
|
|
|
|
# convert MIME message to string
|
|
fp = StringIO()
|
|
gen = Generator(fp, mangle_from_=False)
|
|
gen.flatten(msg)
|
|
msg = fp.getvalue()
|
|
|
|
# send email
|
|
try:
|
|
timeout = 600 # set timeout to 5mins
|
|
|
|
org_stderr = smtplib.stderr
|
|
smtplib.stderr = StderrLogger()
|
|
|
|
if use_ssl == 2:
|
|
mailserver = smtplib.SMTP_SSL(settings["mail_server"], settings["mail_port"], timeout)
|
|
else:
|
|
mailserver = smtplib.SMTP(settings["mail_server"], settings["mail_port"], timeout)
|
|
mailserver.set_debuglevel(1)
|
|
|
|
if use_ssl == 1:
|
|
mailserver.starttls()
|
|
|
|
if settings["mail_password"]:
|
|
mailserver.login(settings["mail_login"], settings["mail_password"])
|
|
mailserver.sendmail(settings["mail_login"], kindle_mail, msg)
|
|
mailserver.quit()
|
|
|
|
smtplib.stderr = org_stderr
|
|
|
|
except (socket.error, smtplib.SMTPRecipientsRefused, smtplib.SMTPException), e:
|
|
app.logger.error(traceback.print_exc())
|
|
return _("Failed to send mail: %s" % str(e))
|
|
|
|
return None
|
|
|
|
|
|
def send_test_mail(kindle_mail):
|
|
msg = MIMEMultipart()
|
|
msg['Subject'] = _(u'Calibre-web test email')
|
|
text = _(u'This email has been sent via calibre web.')
|
|
msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8'))
|
|
return send_raw_email(kindle_mail, msg)
|
|
|
|
|
|
def send_mail(book_id, kindle_mail, calibrepath):
|
|
"""Send email with attachments"""
|
|
# create MIME message
|
|
msg = MIMEMultipart()
|
|
msg['Subject'] = _(u'Send to Kindle')
|
|
text = _(u'This email has been sent via calibre web.')
|
|
msg.attach(MIMEText(text.encode('UTF-8'), 'plain', 'UTF-8'))
|
|
|
|
book = db.session.query(db.Books).filter(db.Books.id == book_id).first()
|
|
data = db.session.query(db.Data).filter(db.Data.book == book.id)
|
|
|
|
formats = {}
|
|
|
|
for entry in data:
|
|
if entry.format == "MOBI":
|
|
formats["mobi"] = os.path.join(calibrepath, book.path, entry.name + ".mobi")
|
|
if entry.format == "EPUB":
|
|
formats["epub"] = os.path.join(calibrepath, book.path, entry.name + ".epub")
|
|
if entry.format == "PDF":
|
|
formats["pdf"] = os.path.join(calibrepath, book.path, entry.name + ".pdf")
|
|
|
|
if len(formats) == 0:
|
|
return _("Could not find any formats suitable for sending by email")
|
|
|
|
if 'mobi' in formats:
|
|
msg.attach(get_attachment(formats['mobi']))
|
|
elif 'epub' in formats:
|
|
filepath = make_mobi(book.id, calibrepath)
|
|
if filepath is not None:
|
|
msg.attach(get_attachment(filepath))
|
|
elif filepath is None:
|
|
return _("Could not convert epub to mobi")
|
|
elif 'pdf' in formats:
|
|
msg.attach(get_attachment(formats['pdf']))
|
|
elif 'pdf' in formats:
|
|
msg.attach(get_attachment(formats['pdf']))
|
|
else:
|
|
return _("Could not find any formats suitable for sending by email")
|
|
|
|
return send_raw_email(kindle_mail, msg)
|
|
|
|
|
|
def get_attachment(file_path):
|
|
"""Get file as MIMEBase message"""
|
|
|
|
try:
|
|
file_ = open(file_path, 'rb')
|
|
attachment = MIMEBase('application', 'octet-stream')
|
|
attachment.set_payload(file_.read())
|
|
file_.close()
|
|
encoders.encode_base64(attachment)
|
|
|
|
attachment.add_header('Content-Disposition', 'attachment',
|
|
filename=os.path.basename(file_path))
|
|
return attachment
|
|
except IOError:
|
|
traceback.print_exc()
|
|
app.logger.error = (u'The requested file could not be read. Maybe wrong permissions?')
|
|
return None
|
|
|
|
|
|
def get_valid_filename(value, replace_whitespace=True):
|
|
"""
|
|
Returns the given string converted to a string that can be used for a clean
|
|
filename. Limits num characters to 128 max.
|
|
"""
|
|
if value[-1:] ==u'.':
|
|
value = value[:-1]+u'_'
|
|
if use_unidecode:
|
|
value=(unidecode.unidecode(value)).strip()
|
|
else:
|
|
value=value.replace('§','SS')
|
|
value=value.replace('ß','ss')
|
|
value = unicodedata.normalize('NFKD', value)
|
|
re_slugify = re.compile('[\W\s-]', re.UNICODE)
|
|
value = unicode(re_slugify.sub('', value).strip())
|
|
if replace_whitespace:
|
|
#*+:\"/<>? werden durch _ ersetzt
|
|
value = re.sub('[\*\+:\\\"/<>\?]+', '_', value, flags=re.U)
|
|
|
|
value = value[:128]
|
|
return value
|
|
|
|
def get_sorted_author(value):
|
|
regexes = ["^(JR|SR)\.?$","^I{1,3}\.?$","^IV\.?$"]
|
|
combined = "(" + ")|(".join(regexes) + ")"
|
|
value = value.split(" ")
|
|
if re.match(combined,value[-1].upper()):
|
|
value2 = value[-2] + ", " + " ".join(value[:-2]) + " " + value[-1]
|
|
else:
|
|
value2 = value[-1] + ", " + " ".join(value[:-1])
|
|
return value2
|
|
|
|
|
|
def update_dir_stucture(book_id, calibrepath):
|
|
db.session.connection().connection.connection.create_function("title_sort", 1, db.title_sort)
|
|
book = db.session.query(db.Books).filter(db.Books.id == book_id).first()
|
|
path = os.path.join(calibrepath, book.path)#.replace('/',os.path.sep)).replace('\\',os.path.sep)
|
|
|
|
authordir = book.path.split('/')[0]
|
|
new_authordir = get_valid_filename(book.authors[0].name)
|
|
titledir = book.path.split('/')[1]
|
|
new_titledir = get_valid_filename(book.title) + " (" + str(book_id) + ")"
|
|
|
|
if titledir != new_titledir:
|
|
new_title_path = os.path.join(os.path.dirname(path), new_titledir)
|
|
os.rename(path, new_title_path)
|
|
path = new_title_path
|
|
book.path = book.path.split('/')[0] + '/' + new_titledir
|
|
|
|
if authordir != new_authordir:
|
|
new_author_path = os.path.join(os.path.join(calibrepath, new_authordir), os.path.basename(path))
|
|
os.rename(path, new_author_path)
|
|
book.path = new_authordir + '/' + book.path.split('/')[1]
|
|
db.session.commit()
|
|
|
|
|
|
def file_to_list(file):
|
|
return [x.strip() for x in open(file, 'r') if not x.startswith('#EXT')]
|
|
|
|
def one_minus_two(one, two):
|
|
return [x for x in one if x not in set(two)]
|
|
|
|
def reduce_dirs(delete_files, new_list):
|
|
new_delete = []
|
|
for file in delete_files:
|
|
parts = file.split(os.sep)
|
|
sub = ''
|
|
for i in range(len(parts)):
|
|
sub = os.path.join(sub, parts[i])
|
|
if sub == '':
|
|
sub = os.sep
|
|
count = 0
|
|
for song in new_list:
|
|
if song.startswith(sub):
|
|
count += 1
|
|
break
|
|
if count == 0:
|
|
if sub != '\\':
|
|
new_delete.append(sub)
|
|
break
|
|
return list(set(new_delete))
|
|
|
|
def reduce_files(remove_items, exclude_items):
|
|
rf = []
|
|
for item in remove_items:
|
|
if not item in exclude_items:
|
|
rf.append(item)
|
|
return rf
|
|
|
|
def moveallfiles(root_src_dir, root_dst_dir):
|
|
change_permissions = True
|
|
if sys.platform == "win32" or sys.platform == "darwin":
|
|
change_permissions=False
|
|
else:
|
|
app.logger.debug('Update on OS-System : '+sys.platform )
|
|
#print('OS-System: '+sys.platform )
|
|
new_permissions=os.stat(root_dst_dir)
|
|
#print new_permissions
|
|
for src_dir, dirs, files in os.walk(root_src_dir):
|
|
dst_dir = src_dir.replace(root_src_dir, root_dst_dir, 1)
|
|
if not os.path.exists(dst_dir):
|
|
os.makedirs(dst_dir)
|
|
#print('Create-Dir: '+dst_dir)
|
|
if change_permissions:
|
|
#print('Permissions: User '+str(new_permissions.st_uid)+' Group '+str(new_permissions.st_uid))
|
|
os.chown(dst_dir,new_permissions.st_uid,new_permissions.st_gid)
|
|
for file_ in files:
|
|
src_file = os.path.join(src_dir, file_)
|
|
dst_file = os.path.join(dst_dir, file_)
|
|
if os.path.exists(dst_file):
|
|
if change_permissions:
|
|
permission=os.stat(dst_file)
|
|
#print('Remove file before copy: '+dst_file)
|
|
os.remove(dst_file)
|
|
else:
|
|
if change_permissions:
|
|
permission=new_permissions
|
|
shutil.move(src_file, dst_dir)
|
|
#print('Move File '+src_file+' to '+dst_dir)
|
|
if change_permissions:
|
|
try:
|
|
os.chown(dst_file, permission.st_uid, permission.st_uid)
|
|
#print('Permissions: User '+str(new_permissions.st_uid)+' Group '+str(new_permissions.st_uid))
|
|
except:
|
|
e = sys.exc_info()
|
|
#print('Fail '+str(dst_file)+' error: '+str(e))
|
|
return
|
|
|
|
|
|
def update_source(source,destination):
|
|
# destination files
|
|
old_list=list()
|
|
exclude = (['vendor' + os.sep + 'kindlegen.exe','vendor' + os.sep + 'kindlegen','/app.db','vendor','/update.py'])
|
|
for root, dirs, files in os.walk(destination, topdown=True):
|
|
for name in files:
|
|
old_list.append(os.path.join(root, name).replace(destination, ''))
|
|
for name in dirs:
|
|
old_list.append(os.path.join(root, name).replace(destination, ''))
|
|
# source files
|
|
new_list = list()
|
|
for root, dirs, files in os.walk(source, topdown=True):
|
|
for name in files:
|
|
new_list.append(os.path.join(root, name).replace(source, ''))
|
|
for name in dirs:
|
|
new_list.append(os.path.join(root, name).replace(source, ''))
|
|
|
|
delete_files = one_minus_two(old_list, new_list)
|
|
#print('raw delete list', delete_files)
|
|
|
|
rf= reduce_files(delete_files, exclude)
|
|
#print('reduced delete list', rf)
|
|
|
|
remove_items = reduce_dirs(rf, new_list)
|
|
#print('delete files', remove_items)
|
|
|
|
moveallfiles(source, destination)
|
|
|
|
for item in remove_items:
|
|
item_path = os.path.join(destination, item[1:])
|
|
if os.path.isdir(item_path):
|
|
print("Delete dir "+ item_path)
|
|
shutil.rmtree(item_path)
|
|
else:
|
|
try:
|
|
print("Delete file "+ item_path)
|
|
os.remove(item_path)
|
|
except:
|
|
print("Could not remove:"+item_path)
|
|
shutil.rmtree(source, ignore_errors=True)
|