mirror of
https://github.com/janeczku/calibre-web.git
synced 2025-01-24 05:26:33 +02:00
ebe7cd7ba4
Cookies are saved in database for better Invalidation Cookies expiry date is saved in database for further deletion (missing) Database conversion is missing
405 lines
18 KiB
Python
405 lines
18 KiB
Python
# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
|
|
# Copyright (C) 2022 OzzieIsaacs
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import json
|
|
from datetime import datetime
|
|
|
|
from flask import Blueprint, request, redirect, url_for, flash
|
|
from flask import session as flask_session
|
|
from .cw_login import current_user
|
|
from flask_babel import format_date
|
|
from flask_babel import gettext as _
|
|
from sqlalchemy.sql.expression import func, not_, and_, or_, text, true
|
|
from sqlalchemy.sql.functions import coalesce
|
|
|
|
from . import logger, db, calibre_db, config, ub
|
|
from .usermanagement import login_required_if_no_ano
|
|
from .render_template import render_title_template
|
|
from .pagination import Pagination
|
|
|
|
|
|
search = Blueprint('search', __name__)
|
|
|
|
log = logger.create()
|
|
|
|
|
|
@search.route("/search", methods=["GET"])
|
|
@login_required_if_no_ano
|
|
def simple_search():
|
|
term = request.args.get("query")
|
|
if term:
|
|
return redirect(url_for('web.books_list', data="search", sort_param='stored', query=term.strip()))
|
|
else:
|
|
return render_title_template('search.html',
|
|
searchterm="",
|
|
result_count=0,
|
|
title=_("Search"),
|
|
page="search")
|
|
|
|
|
|
@search.route("/advsearch", methods=['POST'])
|
|
@login_required_if_no_ano
|
|
def advanced_search():
|
|
values = dict(request.form)
|
|
params = ['include_tag', 'exclude_tag', 'include_serie', 'exclude_serie', 'include_shelf', 'exclude_shelf',
|
|
'include_language', 'exclude_language', 'include_extension', 'exclude_extension']
|
|
for param in params:
|
|
values[param] = list(request.form.getlist(param))
|
|
flask_session['query'] = json.dumps(values)
|
|
return redirect(url_for('web.books_list', data="advsearch", sort_param='stored', query=""))
|
|
|
|
|
|
@search.route("/advsearch", methods=['GET'])
|
|
@login_required_if_no_ano
|
|
def advanced_search_form():
|
|
# Build custom columns names
|
|
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
|
|
return render_prepare_search_form(cc)
|
|
|
|
|
|
def adv_search_custom_columns(cc, term, q):
|
|
for c in cc:
|
|
if c.datatype == "datetime":
|
|
custom_start = term.get('custom_column_' + str(c.id) + '_start')
|
|
custom_end = term.get('custom_column_' + str(c.id) + '_end')
|
|
if custom_start:
|
|
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
|
|
func.datetime(db.cc_classes[c.id].value) >= func.datetime(custom_start)))
|
|
if custom_end:
|
|
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
|
|
func.datetime(db.cc_classes[c.id].value) <= func.datetime(custom_end)))
|
|
else:
|
|
custom_query = term.get('custom_column_' + str(c.id))
|
|
if custom_query != '' and custom_query is not None:
|
|
if c.datatype == 'bool':
|
|
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
|
|
db.cc_classes[c.id].value == (custom_query == "True")))
|
|
elif c.datatype == 'int' or c.datatype == 'float':
|
|
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
|
|
db.cc_classes[c.id].value == custom_query))
|
|
elif c.datatype == 'rating':
|
|
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
|
|
db.cc_classes[c.id].value == int(float(custom_query) * 2)))
|
|
else:
|
|
q = q.filter(getattr(db.Books, 'custom_column_' + str(c.id)).any(
|
|
func.lower(db.cc_classes[c.id].value).ilike("%" + custom_query + "%")))
|
|
return q
|
|
|
|
|
|
def adv_search_language(q, include_languages_inputs, exclude_languages_inputs):
|
|
if current_user.filter_language() != "all":
|
|
q = q.filter(db.Books.languages.any(db.Languages.lang_code == current_user.filter_language()))
|
|
else:
|
|
for language in include_languages_inputs:
|
|
q = q.filter(db.Books.languages.any(db.Languages.id == language))
|
|
for language in exclude_languages_inputs:
|
|
q = q.filter(not_(db.Books.series.any(db.Languages.id == language)))
|
|
return q
|
|
|
|
|
|
def adv_search_ratings(q, rating_high, rating_low):
|
|
if rating_high:
|
|
rating_high = int(rating_high) * 2
|
|
q = q.filter(db.Books.ratings.any(db.Ratings.rating <= rating_high))
|
|
if rating_low:
|
|
rating_low = int(rating_low) * 2
|
|
q = q.filter(db.Books.ratings.any(db.Ratings.rating >= rating_low))
|
|
return q
|
|
|
|
|
|
def adv_search_read_status(read_status):
|
|
if not config.config_read_column:
|
|
if read_status == "True":
|
|
db_filter = and_(ub.ReadBook.user_id == int(current_user.id),
|
|
ub.ReadBook.read_status == ub.ReadBook.STATUS_FINISHED)
|
|
else:
|
|
db_filter = coalesce(ub.ReadBook.read_status, 0) != ub.ReadBook.STATUS_FINISHED
|
|
else:
|
|
try:
|
|
if read_status == "True":
|
|
db_filter = db.cc_classes[config.config_read_column].value == True
|
|
else:
|
|
db_filter = coalesce(db.cc_classes[config.config_read_column].value, False) != True
|
|
except (KeyError, AttributeError, IndexError):
|
|
log.error("Custom Column No.{} does not exist in calibre database".format(config.config_read_column))
|
|
flash(_("Custom Column No.%(column)d does not exist in calibre database",
|
|
column=config.config_read_column),
|
|
category="error")
|
|
return true()
|
|
return db_filter
|
|
|
|
|
|
def adv_search_extension(q, include_extension_inputs, exclude_extension_inputs):
|
|
for extension in include_extension_inputs:
|
|
q = q.filter(db.Books.data.any(db.Data.format == extension))
|
|
for extension in exclude_extension_inputs:
|
|
q = q.filter(not_(db.Books.data.any(db.Data.format == extension)))
|
|
return q
|
|
|
|
|
|
def adv_search_tag(q, include_tag_inputs, exclude_tag_inputs):
|
|
for tag in include_tag_inputs:
|
|
q = q.filter(db.Books.tags.any(db.Tags.id == tag))
|
|
for tag in exclude_tag_inputs:
|
|
q = q.filter(not_(db.Books.tags.any(db.Tags.id == tag)))
|
|
return q
|
|
|
|
|
|
def adv_search_serie(q, include_series_inputs, exclude_series_inputs):
|
|
for serie in include_series_inputs:
|
|
q = q.filter(db.Books.series.any(db.Series.id == serie))
|
|
for serie in exclude_series_inputs:
|
|
q = q.filter(not_(db.Books.series.any(db.Series.id == serie)))
|
|
return q
|
|
|
|
def adv_search_shelf(q, include_shelf_inputs, exclude_shelf_inputs):
|
|
q = q.outerjoin(ub.BookShelf, db.Books.id == ub.BookShelf.book_id)\
|
|
.filter(or_(ub.BookShelf.shelf == None, ub.BookShelf.shelf.notin_(exclude_shelf_inputs)))
|
|
if len(include_shelf_inputs) > 0:
|
|
q = q.filter(ub.BookShelf.shelf.in_(include_shelf_inputs))
|
|
return q
|
|
|
|
def extend_search_term(searchterm,
|
|
author_name,
|
|
book_title,
|
|
publisher,
|
|
pub_start,
|
|
pub_end,
|
|
tags,
|
|
rating_high,
|
|
rating_low,
|
|
read_status,
|
|
):
|
|
searchterm.extend((author_name.replace('|', ','), book_title, publisher))
|
|
if pub_start:
|
|
try:
|
|
searchterm.extend([_("Published after ") +
|
|
format_date(datetime.strptime(pub_start, "%Y-%m-%d"),
|
|
format='medium')])
|
|
except ValueError:
|
|
pub_start = ""
|
|
if pub_end:
|
|
try:
|
|
searchterm.extend([_("Published before ") +
|
|
format_date(datetime.strptime(pub_end, "%Y-%m-%d"),
|
|
format='medium')])
|
|
except ValueError:
|
|
pub_end = ""
|
|
elements = {'tag': db.Tags, 'serie':db.Series, 'shelf':ub.Shelf}
|
|
for key, db_element in elements.items():
|
|
tag_names = calibre_db.session.query(db_element).filter(db_element.id.in_(tags['include_' + key])).all()
|
|
searchterm.extend(tag.name for tag in tag_names)
|
|
tag_names = calibre_db.session.query(db_element).filter(db_element.id.in_(tags['exclude_' + key])).all()
|
|
searchterm.extend(tag.name for tag in tag_names)
|
|
language_names = calibre_db.session.query(db.Languages). \
|
|
filter(db.Languages.id.in_(tags['include_language'])).all()
|
|
if language_names:
|
|
language_names = calibre_db.speaking_language(language_names)
|
|
searchterm.extend(language.name for language in language_names)
|
|
language_names = calibre_db.session.query(db.Languages). \
|
|
filter(db.Languages.id.in_(tags['exclude_language'])).all()
|
|
if language_names:
|
|
language_names = calibre_db.speaking_language(language_names)
|
|
searchterm.extend(language.name for language in language_names)
|
|
if rating_high:
|
|
searchterm.extend([_("Rating <= %(rating)s", rating=rating_high)])
|
|
if rating_low:
|
|
searchterm.extend([_("Rating >= %(rating)s", rating=rating_low)])
|
|
if read_status != "Any":
|
|
searchterm.extend([_("Read Status = '%(status)s'", status=read_status)])
|
|
searchterm.extend(ext for ext in tags['include_extension'])
|
|
searchterm.extend(ext for ext in tags['exclude_extension'])
|
|
# handle custom columns
|
|
searchterm = " + ".join(filter(None, searchterm))
|
|
return searchterm, pub_start, pub_end
|
|
|
|
|
|
def render_adv_search_results(term, offset=None, order=None, limit=None):
|
|
sort = order[0] if order else [db.Books.sort]
|
|
pagination = None
|
|
|
|
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
|
|
calibre_db.session.connection().connection.connection.create_function("lower", 1, db.lcase)
|
|
query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
|
|
q = query.outerjoin(db.books_series_link, db.Books.id == db.books_series_link.c.book)\
|
|
.outerjoin(db.Series)\
|
|
.filter(calibre_db.common_filters(True))
|
|
|
|
# parse multi selects to a complete dict
|
|
tags = dict()
|
|
elements = ['tag', 'serie', 'shelf', 'language', 'extension']
|
|
for element in elements:
|
|
tags['include_' + element] = term.get('include_' + element)
|
|
tags['exclude_' + element] = term.get('exclude_' + element)
|
|
|
|
author_name = term.get("author_name")
|
|
book_title = term.get("book_title")
|
|
publisher = term.get("publisher")
|
|
pub_start = term.get("publishstart")
|
|
pub_end = term.get("publishend")
|
|
rating_low = term.get("ratinghigh")
|
|
rating_high = term.get("ratinglow")
|
|
description = term.get("comment")
|
|
read_status = term.get("read_status")
|
|
if author_name:
|
|
author_name = author_name.strip().lower().replace(',', '|')
|
|
if book_title:
|
|
book_title = book_title.strip().lower()
|
|
if publisher:
|
|
publisher = publisher.strip().lower()
|
|
|
|
search_term = []
|
|
cc_present = False
|
|
for c in cc:
|
|
if c.datatype == "datetime":
|
|
column_start = term.get('custom_column_' + str(c.id) + '_start')
|
|
column_end = term.get('custom_column_' + str(c.id) + '_end')
|
|
if column_start:
|
|
search_term.extend(["{} >= {}".format(c.name,
|
|
format_date(datetime.strptime(column_start, "%Y-%m-%d").date(),
|
|
format='medium')
|
|
)])
|
|
cc_present = True
|
|
if column_end:
|
|
search_term.extend(["{} <= {}".format(c.name,
|
|
format_date(datetime.strptime(column_end, "%Y-%m-%d").date(),
|
|
format='medium')
|
|
)])
|
|
cc_present = True
|
|
elif term.get('custom_column_' + str(c.id)):
|
|
search_term.extend([("{}: {}".format(c.name, term.get('custom_column_' + str(c.id))))])
|
|
cc_present = True
|
|
|
|
if any(tags.values()) or author_name or book_title or publisher or pub_start or pub_end or rating_low \
|
|
or rating_high or description or cc_present or read_status != "Any":
|
|
search_term, pub_start, pub_end = extend_search_term(search_term,
|
|
author_name,
|
|
book_title,
|
|
publisher,
|
|
pub_start,
|
|
pub_end,
|
|
tags,
|
|
rating_high,
|
|
rating_low,
|
|
read_status)
|
|
if author_name:
|
|
q = q.filter(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + author_name + "%")))
|
|
if book_title:
|
|
q = q.filter(func.lower(db.Books.title).ilike("%" + book_title + "%"))
|
|
if pub_start:
|
|
q = q.filter(func.datetime(db.Books.pubdate) > func.datetime(pub_start))
|
|
if pub_end:
|
|
q = q.filter(func.datetime(db.Books.pubdate) < func.datetime(pub_end))
|
|
if read_status != "Any":
|
|
q = q.filter(adv_search_read_status(read_status))
|
|
if publisher:
|
|
q = q.filter(db.Books.publishers.any(func.lower(db.Publishers.name).ilike("%" + publisher + "%")))
|
|
q = adv_search_tag(q, tags['include_tag'], tags['exclude_tag'])
|
|
q = adv_search_serie(q, tags['include_serie'], tags['exclude_serie'])
|
|
q = adv_search_shelf(q, tags['include_shelf'], tags['exclude_shelf'])
|
|
q = adv_search_extension(q, tags['include_extension'], tags['exclude_extension'])
|
|
q = adv_search_language(q, tags['include_language'], tags['exclude_language'])
|
|
q = adv_search_ratings(q, rating_high, rating_low)
|
|
|
|
if description:
|
|
q = q.filter(db.Books.comments.any(func.lower(db.Comments.text).ilike("%" + description + "%")))
|
|
|
|
# search custom columns
|
|
try:
|
|
q = adv_search_custom_columns(cc, term, q)
|
|
except AttributeError as ex:
|
|
log.debug_or_exception(ex)
|
|
flash(_("Error on search for custom columns, please restart Calibre-Web"), category="error")
|
|
|
|
q = q.order_by(*sort).all()
|
|
flask_session['query'] = json.dumps(term)
|
|
ub.store_combo_ids(q)
|
|
result_count = len(q)
|
|
if offset is not None and limit is not None:
|
|
offset = int(offset)
|
|
limit_all = offset + int(limit)
|
|
pagination = Pagination((offset / (int(limit)) + 1), limit, result_count)
|
|
else:
|
|
offset = 0
|
|
limit_all = result_count
|
|
entries = calibre_db.order_authors(q[offset:limit_all], list_return=True, combined=True)
|
|
return render_title_template('search.html',
|
|
adv_searchterm=search_term,
|
|
pagination=pagination,
|
|
entries=entries,
|
|
result_count=result_count,
|
|
title=_("Advanced Search"), page="advsearch",
|
|
order=order[1])
|
|
|
|
|
|
def render_prepare_search_form(cc):
|
|
# prepare data for search-form
|
|
tags = calibre_db.session.query(db.Tags)\
|
|
.join(db.books_tags_link)\
|
|
.join(db.Books)\
|
|
.filter(calibre_db.common_filters()) \
|
|
.group_by(text('books_tags_link.tag'))\
|
|
.order_by(db.Tags.name).all()
|
|
series = calibre_db.session.query(db.Series)\
|
|
.join(db.books_series_link)\
|
|
.join(db.Books)\
|
|
.filter(calibre_db.common_filters()) \
|
|
.group_by(text('books_series_link.series'))\
|
|
.order_by(db.Series.name)\
|
|
.filter(calibre_db.common_filters()).all()
|
|
shelves = ub.session.query(ub.Shelf)\
|
|
.filter(or_(ub.Shelf.is_public == 1, ub.Shelf.user_id == int(current_user.id)))\
|
|
.order_by(ub.Shelf.name).all()
|
|
extensions = calibre_db.session.query(db.Data)\
|
|
.join(db.Books)\
|
|
.filter(calibre_db.common_filters()) \
|
|
.group_by(db.Data.format)\
|
|
.order_by(db.Data.format).all()
|
|
if current_user.filter_language() == "all":
|
|
languages = calibre_db.speaking_language()
|
|
else:
|
|
languages = None
|
|
return render_title_template('search_form.html', tags=tags, languages=languages, extensions=extensions,
|
|
series=series,shelves=shelves, title=_("Advanced Search"), cc=cc, page="advsearch")
|
|
|
|
|
|
def render_search_results(term, offset=None, order=None, limit=None):
|
|
if term:
|
|
join = db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series
|
|
entries, result_count, pagination = calibre_db.get_search_results(term,
|
|
config,
|
|
offset,
|
|
order,
|
|
limit,
|
|
*join)
|
|
else:
|
|
entries = list()
|
|
order = [None, None]
|
|
pagination = result_count = None
|
|
|
|
return render_title_template('search.html',
|
|
searchterm=term,
|
|
pagination=pagination,
|
|
query=term,
|
|
adv_searchterm=term,
|
|
entries=entries,
|
|
result_count=result_count,
|
|
title=_("Search"),
|
|
page="search",
|
|
order=order[1])
|
|
|
|
|