1
0
mirror of https://github.com/janeczku/calibre-web.git synced 2025-01-12 04:34:13 +02:00
calibre-web/cps/tasks/metadata_backup.py

256 lines
11 KiB
Python

# -*- coding: utf-8 -*-
# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
# Copyright (C) 2020 monkey
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from lxml import objectify
from urllib.request import urlopen
from lxml import etree
from html import escape
from cps import config, db, gdriveutils, logger
from cps.services.worker import CalibreTask
from flask_babel import lazy_gettext as N_
OPF_NAMESPACE = "http://www.idpf.org/2007/opf"
PURL_NAMESPACE = "http://purl.org/dc/elements/1.1/"
OPF = "{%s}" % OPF_NAMESPACE
PURL = "{%s}" % PURL_NAMESPACE
etree.register_namespace("opf", OPF_NAMESPACE)
etree.register_namespace("dc", PURL_NAMESPACE)
OPF_NS = {None: OPF_NAMESPACE} # the default namespace (no prefix)
NSMAP = {'dc': PURL_NAMESPACE, 'opf': OPF_NAMESPACE}
class TaskBackupMetadata(CalibreTask):
def __init__(self, export_language="en",
translated_title="Cover",
set_dirty=False,
task_message=N_('Backing up Metadata')):
super(TaskBackupMetadata, self).__init__(task_message)
self.log = logger.create()
self.calibre_db = db.CalibreDB(expire_on_commit=False, init=True)
self.export_language = export_language
self.translated_title = translated_title
self.set_dirty = set_dirty
def run(self, worker_thread):
if self.set_dirty:
self.set_all_books_dirty()
else:
self.backup_metadata()
def set_all_books_dirty(self):
try:
books = self.calibre_db.session.query(db.Books).all()
for book in books:
self.calibre_db.set_metadata_dirty(book.id)
self.calibre_db.session.commit()
self._handleSuccess()
except Exception as ex:
self.log.debug('Error adding book for backup: ' + str(ex))
self._handleError('Error adding book for backup: ' + str(ex))
self.calibre_db.session.rollback()
self.calibre_db.session.close()
def backup_metadata(self):
try:
metadata_backup = self.calibre_db.session.query(db.Metadata_Dirtied).all()
custom_columns = (self.calibre_db.session.query(db.CustomColumns)
.filter(db.CustomColumns.mark_for_delete == 0)
.filter(db.CustomColumns.datatype.notin_(db.cc_exceptions))
.order_by(db.CustomColumns.label).all())
count = len(metadata_backup)
i = 0
for backup in metadata_backup:
book = self.calibre_db.session.query(db.Books).filter(db.Books.id == backup.book).one_or_none()
self.calibre_db.session.query(db.Metadata_Dirtied).filter(
db.Metadata_Dirtied.book == backup.book).delete()
self.calibre_db.session.commit()
if book:
self.open_metadata(book, custom_columns)
else:
self.log.error("Book {} not found in database".format(backup.book))
i += 1
self.progress = (1.0 / count) * i
self._handleSuccess()
self.calibre_db.session.close()
except Exception as ex:
b = "NaN" if not hasattr(book, 'id') else book.id
self.log.debug('Error creating metadata backup for book {}: '.format(b) + str(ex))
self._handleError('Error creating metadata backup: ' + str(ex))
self.calibre_db.session.rollback()
self.calibre_db.session.close()
def open_metadata(self, book, custom_columns):
if config.config_use_google_drive:
if not gdriveutils.is_gdrive_ready():
raise Exception('Google Drive is configured but not ready')
web_content_link = gdriveutils.get_metadata_backup_via_gdrive(book.path)
if not web_content_link:
raise Exception('Google Drive cover url not found')
stream = None
try:
stream = urlopen(web_content_link)
except Exception as ex:
# Bubble exception to calling function
self.log.debug('Error reading metadata.opf: ' + str(ex)) # ToDo Check whats going on
raise ex
finally:
if stream is not None:
stream.close()
else:
# ToDo: Handle book folder not found or not readable
book_metadata_filepath = os.path.join(config.config_calibre_dir, book.path, 'metadata.opf')
#if not os.path.isfile(book_metadata_filepath):
self.create_new_metadata_backup(book, custom_columns, book_metadata_filepath)
# else:
'''namespaces = {'dc': PURL_NAMESPACE, 'opf': OPF_NAMESPACE}
test = etree.parse(book_metadata_filepath)
root = test.getroot()
for i in root.iter():
self.log.info(i)
title = root.find("dc:metadata", namespaces)
pass
with open(book_metadata_filepath, "rb") as f:
xml = f.read()
root = objectify.fromstring(xml)
# root.metadata['{http://purl.org/dc/elements/1.1/}title']
# root.metadata[PURL + 'title']
# getattr(root.metadata, PURL +'title')
# test = objectify.parse()
pass
# backup not found has to be created
#raise Exception('Book cover file not found')'''
def create_new_metadata_backup(self, book, custom_columns, book_metadata_filepath):
# generate root package element
package = etree.Element(OPF + "package", nsmap=OPF_NS)
package.set("unique-identifier", "uuid_id")
package.set("version", "2.0")
# generate metadata element and all sub elements of it
metadata = etree.SubElement(package, "metadata", nsmap=NSMAP)
identifier = etree.SubElement(metadata, PURL + "identifier", id="calibre_id", nsmap=NSMAP)
identifier.set(OPF + "scheme", "calibre")
identifier.text = str(book.id)
identifier2 = etree.SubElement(metadata, PURL + "identifier", id="uuid_id", nsmap=NSMAP)
identifier2.set(OPF + "scheme", "uuid")
identifier2.text = book.uuid
title = etree.SubElement(metadata, PURL + "title", nsmap=NSMAP)
title.text = book.title
for author in book.authors:
creator = etree.SubElement(metadata, PURL + "creator", nsmap=NSMAP)
creator.text = str(author.name)
creator.set(OPF + "file-as", book.author_sort) # ToDo Check
creator.set(OPF + "role", "aut")
contributor = etree.SubElement(metadata, PURL + "contributor", nsmap=NSMAP)
contributor.text = "calibre (5.7.2) [https://calibre-ebook.com]"
contributor.set(OPF + "file-as", "calibre") # ToDo Check
contributor.set(OPF + "role", "bkp")
date = etree.SubElement(metadata, PURL + "date", nsmap=NSMAP)
date.text = '{d.year:04}-{d.month:02}-{d.day:02}T{d.hour:02}:{d.minute:02}:{d.second:02}'.format(d=book.pubdate)
if book.comments and book.comments[0].text:
for b in book.comments:
description = etree.SubElement(metadata, PURL + "description", nsmap=NSMAP)
description.text = b.text
for b in book.publishers:
publisher = etree.SubElement(metadata, PURL + "publisher", nsmap=NSMAP)
publisher.text = str(b.name)
if not book.languages:
language = etree.SubElement(metadata, PURL + "language", nsmap=NSMAP)
language.text = self.export_language
else:
for b in book.languages:
language = etree.SubElement(metadata, PURL + "language", nsmap=NSMAP)
language.text = str(b.lang_code)
for b in book.tags:
subject = etree.SubElement(metadata, PURL + "subject", nsmap=NSMAP)
subject.text = str(b.name)
etree.SubElement(metadata, "meta", name="calibre:author_link_map",
content="{" + ", ".join(['"' + str(a.name) + '": ""' for a in book.authors]) + "}",
nsmap=NSMAP)
for b in book.series:
etree.SubElement(metadata, "meta", name="calibre:series",
content=str(str(b.name)),
nsmap=NSMAP)
if book.series:
etree.SubElement(metadata, "meta", name="calibre:series_index",
content=str(book.series_index),
nsmap=NSMAP)
if len(book.ratings) and book.ratings[0].rating > 0:
etree.SubElement(metadata, "meta", name="calibre:rating",
content=str(book.ratings[0].rating),
nsmap=NSMAP)
etree.SubElement(metadata, "meta", name="calibre:timestamp",
content='{d.year:04}-{d.month:02}-{d.day:02}T{d.hour:02}:{d.minute:02}:{d.second:02}'.format(
d=book.timestamp),
nsmap=NSMAP)
etree.SubElement(metadata, "meta", name="calibre:title_sort",
content=book.sort,
nsmap=NSMAP)
sequence = 0
for cc in custom_columns:
value = None
extra = None
cc_entry = getattr(book, "custom_column_" + str(cc.id))
if cc_entry.__len__():
value = [c.value for c in cc_entry] if cc.is_multiple else cc_entry[0].value
extra = cc_entry[0].extra if hasattr(cc_entry[0], "extra") else None
etree.SubElement(metadata, "meta", name="calibre:user_metadata:#{}".format(cc.label),
content=cc.to_json(value, extra, sequence),
nsmap=NSMAP)
sequence += 1
# generate guide element and all sub elements of it
# Title is translated from default export language
guide = etree.SubElement(package, "guide")
etree.SubElement(guide, "reference", type="cover", title=self.translated_title, href="cover.jpg")
# prepare finalize everything and output
doc = etree.ElementTree(package)
# doc = etree.tostring(package, xml_declaration=True, encoding='utf-8', pretty_print=True) # .replace(b"&amp;quot;", b"&quot;")
try:
with open(book_metadata_filepath, 'wb') as f:
doc.write(f, xml_declaration=True, encoding='utf-8', pretty_print=True)
except Exception as ex:
raise Exception('Writing Metadata failed with error: {} '.format(ex))
@property
def name(self):
return "Metadata backup"
# needed for logging
def __str__(self):
if self.set_dirty:
return "Queue all books for metadata backup"
else:
return "Perform metadata backup"
@property
def is_cancellable(self):
return True