1
0
mirror of https://github.com/linkedin/oncall.git synced 2025-11-29 23:38:17 +02:00

add api endpoints to invalidate ical keys (#302)

* add api endpoints to invalidate ical keys

* invalidate ical_key during user-sync process
This commit is contained in:
Colin Yang
2020-03-10 13:39:33 -07:00
committed by GitHub
parent d54b76ecf4
commit 1ad2de4d90
6 changed files with 169 additions and 2 deletions

View File

@@ -102,9 +102,12 @@ def init(application, config):
application.add_route('/api/v0/users/{user_name}/ical', user_ical) application.add_route('/api/v0/users/{user_name}/ical', user_ical)
application.add_route('/api/v0/teams/{team}/ical', team_ical) application.add_route('/api/v0/teams/{team}/ical', team_ical)
from . import ical_key_user, ical_key_team from . import ical_key_user, ical_key_team, ical_key_detail, ical_key_requester
application.add_route('/api/v0/ical_key/user/{user_name}', ical_key_user) application.add_route('/api/v0/ical_key/user/{user_name}', ical_key_user)
application.add_route('/api/v0/ical_key/team/{team}', ical_key_team) application.add_route('/api/v0/ical_key/team/{team}', ical_key_team)
# available to admin only
application.add_route('/api/v0/ical_key/key/{key}', ical_key_detail)
application.add_route('/api/v0/ical_key/requester/{requester}', ical_key_requester)
from . import public_ical from . import public_ical
application.add_route('/api/v0/ical/{key}', public_ical) application.add_route('/api/v0/ical/{key}', public_ical)

View File

@@ -15,7 +15,7 @@ def get_name_and_type_from_key(key):
FROM `ical_key` FROM `ical_key`
WHERE `key` = %s WHERE `key` = %s
''', ''',
(key)) (key, ))
if cursor.rowcount != 0: if cursor.rowcount != 0:
row = cursor.fetchone() row = cursor.fetchone()
result = (row[0], row[1]) result = (row[0], row[1])
@@ -83,3 +83,79 @@ def delete_ical_key(requester, name, type):
cursor.close() cursor.close()
connection.close() connection.close()
#####
# admin actions below
#####
def get_ical_key_detail(key):
connection = db.connect()
cursor = connection.cursor(db.DictCursor)
cursor.execute(
'''
SELECT `requester`, `name`, `type`, `time_created`
FROM `ical_key`
WHERE `key` = %s
''',
(key, ))
# fetchall because we may want to know if there is any key (uuid) collision
results = cursor.fetchall()
cursor.close()
connection.close()
return results
def get_ical_key_detail_by_requester(requester):
connection = db.connect()
cursor = connection.cursor(db.DictCursor)
cursor.execute(
'''
SELECT `key`, `name`, `type`, `time_created`
FROM `ical_key`
WHERE `requester` = %s
''',
(requester, ))
results = cursor.fetchall()
cursor.close()
connection.close()
return results
def invalidate_ical_key(key):
connection = db.connect()
cursor = connection.cursor()
cursor.execute(
'''
DELETE FROM `ical_key`
WHERE
`key` = %s
''',
(key, ))
connection.commit()
cursor.close()
connection.close()
def invalidate_ical_key_by_requester(requester):
connection = db.connect()
cursor = connection.cursor()
cursor.execute(
'''
DELETE FROM `ical_key`
WHERE
`requester` = %s
''',
(requester, ))
connection.commit()
cursor.close()
connection.close()

View File

@@ -0,0 +1,37 @@
# Copyright (c) LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
# See LICENSE in the project root for license information.
from falcon import HTTPNotFound, HTTPForbidden
from ujson import dumps as json_dumps
from ...auth import login_required, check_ical_key_admin
from .ical_key import get_ical_key_detail, invalidate_ical_key
@login_required
def on_get(req, resp, key):
challenger = req.context['user']
if not check_ical_key_admin(challenger):
raise HTTPForbidden(
'Unauthorized',
'Action not allowed: "%s" is not an admin of ical_key' % (challenger, ),
)
results = get_ical_key_detail(key)
if not results:
raise HTTPNotFound()
resp.body = json_dumps(results)
resp.set_header('Content-Type', 'application/json')
@login_required
def on_delete(req, resp, key):
challenger = req.context['user']
if not check_ical_key_admin(challenger):
raise HTTPForbidden(
'Unauthorized',
'Action not allowed: "%s" is not an admin of ical_key' % (challenger, ),
)
invalidate_ical_key(key)

View File

@@ -0,0 +1,40 @@
# Copyright (c) LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
# See LICENSE in the project root for license information.
from falcon import HTTPNotFound, HTTPForbidden
from ujson import dumps as json_dumps
from ...auth import login_required, check_ical_key_admin
from .ical_key import (
get_ical_key_detail_by_requester,
invalidate_ical_key_by_requester,
)
@login_required
def on_get(req, resp, requester):
challenger = req.context['user']
if not check_ical_key_admin(challenger):
raise HTTPForbidden(
'Unauthorized',
'Action not allowed: "%s" is not an admin of ical_key' % (challenger, ),
)
results = get_ical_key_detail_by_requester(requester)
if not results:
raise HTTPNotFound()
resp.body = json_dumps(results)
resp.set_header('Content-Type', 'application/json')
@login_required
def on_delete(req, resp, requester):
challenger = req.context['user']
if not check_ical_key_admin(challenger):
raise HTTPForbidden(
'Unauthorized',
'Action not allowed: "%s" is not an admin of ical_key' % (challenger, ),
)
invalidate_ical_key_by_requester(requester)

View File

@@ -32,6 +32,10 @@ def is_god(challenger):
return is_god != 0 return is_god != 0
def check_ical_key_admin(challenger):
return is_god(challenger)
def check_user_auth(user, req): def check_user_auth(user, req):
""" """
Check to see if current user is user or admin of team where user is in Check to see if current user is user or admin of team where user is in

View File

@@ -88,6 +88,13 @@ def prune_user(engine, username):
logger.error('Deleting user %s failed: %s', username, e) logger.error('Deleting user %s failed: %s', username, e)
stats['sql_errors'] += 1 stats['sql_errors'] += 1
try:
engine.execute('DELETE FROM `ical_key` WHERE `requester` = %s', username)
logger.info('Invalidated ical_key of inactive user %s', username)
except Exception as e:
logger.error('Invalidating ical_key of inactive user %s failed: %s', username, e)
stats['sql_errors'] += 1
def fetch_ldap(): def fetch_ldap():
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW) ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)