mirror of
https://github.com/janeczku/calibre-web.git
synced 2025-01-08 04:04:09 +02:00
4230226716
Fixes reader button visible in detail view Fix formats to convert (added htmlz) Fix logger in updater Added request "v3" of github api on update Fix quotes parameter on external calls E-Mail logger working more stable (also on python3) Routing fixes Change import in ub
138 lines
5.7 KiB
Python
138 lines
5.7 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from flask import session
|
|
try:
|
|
from flask_dance.consumer.backend.sqla import SQLAlchemyBackend, first, _get_real_user
|
|
from sqlalchemy.orm.exc import NoResultFound
|
|
|
|
class OAuthBackend(SQLAlchemyBackend):
|
|
"""
|
|
Stores and retrieves OAuth tokens using a relational database through
|
|
the `SQLAlchemy`_ ORM.
|
|
|
|
.. _SQLAlchemy: http://www.sqlalchemy.org/
|
|
"""
|
|
def __init__(self, model, session,
|
|
user=None, user_id=None, user_required=None, anon_user=None,
|
|
cache=None):
|
|
super(OAuthBackend, self).__init__(model, session, user, user_id, user_required, anon_user, cache)
|
|
|
|
def get(self, blueprint, user=None, user_id=None):
|
|
if blueprint.name + '_oauth_token' in session and session[blueprint.name + '_oauth_token'] != '':
|
|
return session[blueprint.name + '_oauth_token']
|
|
# check cache
|
|
cache_key = self.make_cache_key(blueprint=blueprint, user=user, user_id=user_id)
|
|
token = self.cache.get(cache_key)
|
|
if token:
|
|
return token
|
|
|
|
# if not cached, make database queries
|
|
query = (
|
|
self.session.query(self.model)
|
|
.filter_by(provider=blueprint.name)
|
|
)
|
|
uid = first([user_id, self.user_id, blueprint.config.get("user_id")])
|
|
u = first(_get_real_user(ref, self.anon_user)
|
|
for ref in (user, self.user, blueprint.config.get("user")))
|
|
|
|
use_provider_user_id = False
|
|
if blueprint.name + '_oauth_user_id' in session and session[blueprint.name + '_oauth_user_id'] != '':
|
|
query = query.filter_by(provider_user_id=session[blueprint.name + '_oauth_user_id'])
|
|
use_provider_user_id = True
|
|
|
|
if self.user_required and not u and not uid and not use_provider_user_id:
|
|
#raise ValueError("Cannot get OAuth token without an associated user")
|
|
return None
|
|
# check for user ID
|
|
if hasattr(self.model, "user_id") and uid:
|
|
query = query.filter_by(user_id=uid)
|
|
# check for user (relationship property)
|
|
elif hasattr(self.model, "user") and u:
|
|
query = query.filter_by(user=u)
|
|
# if we have the property, but not value, filter by None
|
|
elif hasattr(self.model, "user_id"):
|
|
query = query.filter_by(user_id=None)
|
|
# run query
|
|
try:
|
|
token = query.one().token
|
|
except NoResultFound:
|
|
token = None
|
|
|
|
# cache the result
|
|
self.cache.set(cache_key, token)
|
|
|
|
return token
|
|
|
|
def set(self, blueprint, token, user=None, user_id=None):
|
|
uid = first([user_id, self.user_id, blueprint.config.get("user_id")])
|
|
u = first(_get_real_user(ref, self.anon_user)
|
|
for ref in (user, self.user, blueprint.config.get("user")))
|
|
|
|
if self.user_required and not u and not uid:
|
|
raise ValueError("Cannot set OAuth token without an associated user")
|
|
|
|
# if there was an existing model, delete it
|
|
existing_query = (
|
|
self.session.query(self.model)
|
|
.filter_by(provider=blueprint.name)
|
|
)
|
|
# check for user ID
|
|
has_user_id = hasattr(self.model, "user_id")
|
|
if has_user_id and uid:
|
|
existing_query = existing_query.filter_by(user_id=uid)
|
|
# check for user (relationship property)
|
|
has_user = hasattr(self.model, "user")
|
|
if has_user and u:
|
|
existing_query = existing_query.filter_by(user=u)
|
|
# queue up delete query -- won't be run until commit()
|
|
existing_query.delete()
|
|
# create a new model for this token
|
|
kwargs = {
|
|
"provider": blueprint.name,
|
|
"token": token,
|
|
}
|
|
if has_user_id and uid:
|
|
kwargs["user_id"] = uid
|
|
if has_user and u:
|
|
kwargs["user"] = u
|
|
self.session.add(self.model(**kwargs))
|
|
# commit to delete and add simultaneously
|
|
self.session.commit()
|
|
# invalidate cache
|
|
self.cache.delete(self.make_cache_key(
|
|
blueprint=blueprint, user=user, user_id=user_id
|
|
))
|
|
|
|
def delete(self, blueprint, user=None, user_id=None):
|
|
query = (
|
|
self.session.query(self.model)
|
|
.filter_by(provider=blueprint.name)
|
|
)
|
|
uid = first([user_id, self.user_id, blueprint.config.get("user_id")])
|
|
u = first(_get_real_user(ref, self.anon_user)
|
|
for ref in (user, self.user, blueprint.config.get("user")))
|
|
|
|
if self.user_required and not u and not uid:
|
|
raise ValueError("Cannot delete OAuth token without an associated user")
|
|
|
|
# check for user ID
|
|
if hasattr(self.model, "user_id") and uid:
|
|
query = query.filter_by(user_id=uid)
|
|
# check for user (relationship property)
|
|
elif hasattr(self.model, "user") and u:
|
|
query = query.filter_by(user=u)
|
|
# if we have the property, but not value, filter by None
|
|
elif hasattr(self.model, "user_id"):
|
|
query = query.filter_by(user_id=None)
|
|
# run query
|
|
query.delete()
|
|
self.session.commit()
|
|
# invalidate cache
|
|
self.cache.delete(self.make_cache_key(
|
|
blueprint=blueprint, user=user, user_id=user_id,
|
|
))
|
|
|
|
except ImportError:
|
|
pass
|