You've already forked oncall
mirror of
https://github.com/linkedin/oncall.git
synced 2025-11-26 23:10:47 +02:00
Fix auth bugs for python 3
Also fix notifier bug to check for active users
This commit is contained in:
@@ -124,7 +124,7 @@ def check_calendar_auth_by_id(team_id, req):
|
||||
|
||||
def is_client_digest_valid(client_digest, api_key, window, method, path, body):
|
||||
text = '%s %s %s %s' % (window, method, path, body)
|
||||
HMAC = hmac.new(api_key, text, hashlib.sha512)
|
||||
HMAC = hmac.new(api_key, text.encode('utf-8'), hashlib.sha512)
|
||||
digest = base64.urlsafe_b64encode(HMAC.digest())
|
||||
if equals(client_digest, digest):
|
||||
return True
|
||||
@@ -139,7 +139,7 @@ def authenticate_application(auth_token, req):
|
||||
qs = req.env['QUERY_STRING']
|
||||
if qs:
|
||||
path = path + '?' + qs
|
||||
body = req.context['body']
|
||||
body = req.context['body'].decode('utf-8')
|
||||
try:
|
||||
app_name, client_digest = auth_token[5:].split(':', 1)
|
||||
if app_name not in app_key_cache:
|
||||
@@ -154,7 +154,7 @@ def authenticate_application(auth_token, req):
|
||||
cursor.close()
|
||||
connection.close()
|
||||
raise HTTPUnauthorized('Authentication failure', 'Application not found', '')
|
||||
api_key = str(app_key_cache[app_name])
|
||||
api_key = app_key_cache[app_name].encode('utf-8')
|
||||
window = int(time.time()) // 5
|
||||
if is_client_digest_valid(client_digest, api_key, window, method, path, body):
|
||||
req.context['app'] = app_name
|
||||
|
||||
@@ -71,7 +71,7 @@ def reminder(config):
|
||||
JOIN `team` ON `event`.`team_id` = `team`.`id`
|
||||
JOIN `role` ON `event`.`role_id` = `role`.`id`
|
||||
LEFT JOIN `event` AS `e` ON `event`.`link_id` = `e`.`link_id` AND `e`.`start` < `event`.`start`
|
||||
WHERE `e`.`id` IS NULL
|
||||
WHERE `e`.`id` IS NULL AND `user`.`active` = 1
|
||||
'''
|
||||
|
||||
while(1):
|
||||
|
||||
61
test/test_auth.py
Normal file
61
test/test_auth.py
Normal file
@@ -0,0 +1,61 @@
|
||||
from oncall.auth import login_required
|
||||
from oncall.app import ReqBodyMiddleware
|
||||
import falcon
|
||||
import falcon.testing
|
||||
import time
|
||||
import hmac
|
||||
import hashlib
|
||||
import base64
|
||||
|
||||
|
||||
class DummyAPI(object):
|
||||
|
||||
@login_required
|
||||
def on_get(self, req, resp):
|
||||
resp.content = 'GOOD'
|
||||
|
||||
@login_required
|
||||
def on_post(self, req, resp):
|
||||
resp.status = falcon.HTTP_201
|
||||
|
||||
|
||||
def test_application_auth(mocker):
|
||||
# Mock DB to get 'abc' as the dummy app key
|
||||
connect = mocker.MagicMock(name='dummyDB')
|
||||
cursor = mocker.MagicMock(name='dummyCursor', rowcount=1)
|
||||
cursor.fetchone.return_value = ['abc']
|
||||
connect.cursor.return_value = cursor
|
||||
db = mocker.MagicMock()
|
||||
db.connect.return_value = connect
|
||||
mocker.patch('oncall.auth.db', db)
|
||||
|
||||
# Set up dummy API for auth testing
|
||||
api = falcon.API(middleware=[ReqBodyMiddleware()])
|
||||
api.add_route('/dummy_path', DummyAPI())
|
||||
|
||||
# Test bad auth
|
||||
client = falcon.testing.TestClient(api)
|
||||
re = client.simulate_get('/dummy_path', headers={'AUTHORIZATION': 'hmac dummy:abc'})
|
||||
assert re.status_code == 401
|
||||
re = client.simulate_post('/dummy_path', json={'example': 'test'}, headers={'AUTHORIZATION': 'hmac dummy:abc'})
|
||||
assert re.status_code == 401
|
||||
|
||||
# Test good auth for GET request
|
||||
window = int(time.time()) // 5
|
||||
text = '%s %s %s %s' % (window, 'GET', '/dummy_path?abc=123', '')
|
||||
HMAC = hmac.new(b'abc', text.encode('utf-8'), hashlib.sha512)
|
||||
digest = base64.urlsafe_b64encode(HMAC.digest()).decode('utf-8')
|
||||
auth = 'hmac dummy:%s' % digest
|
||||
|
||||
re = client.simulate_get('/dummy_path', params={'abc': 123}, headers={'AUTHORIZATION': auth})
|
||||
assert re.status_code == 200
|
||||
|
||||
window = int(time.time()) // 5
|
||||
body = '{"example": "test"}'
|
||||
text = '%s %s %s %s' % (window, 'POST', '/dummy_path', body)
|
||||
HMAC = hmac.new(b'abc', text.encode('utf-8'), hashlib.sha512)
|
||||
digest = base64.urlsafe_b64encode(HMAC.digest()).decode('utf-8')
|
||||
auth = 'hmac dummy:%s' % digest
|
||||
|
||||
re = client.simulate_post('/dummy_path', body=body, headers={'AUTHORIZATION': auth})
|
||||
assert re.status_code == 201
|
||||
Reference in New Issue
Block a user