You've already forked httpie-cli
mirror of
https://github.com/httpie/cli.git
synced 2026-04-26 20:02:11 +02:00
1274d869f6
Since Python 3, the mock dependency is no more required as it is already part of the unittest module.
193 lines
6.7 KiB
Python
193 lines
6.7 KiB
Python
import os
|
|
import tempfile
|
|
import time
|
|
from unittest import mock
|
|
from urllib.request import urlopen
|
|
|
|
import pytest
|
|
from requests.structures import CaseInsensitiveDict
|
|
|
|
from httpie.downloads import (
|
|
parse_content_range, filename_from_content_disposition, filename_from_url,
|
|
get_unique_filename, ContentRangeError, Downloader,
|
|
)
|
|
from utils import http, MockEnvironment
|
|
|
|
|
|
class Response:
|
|
# noinspection PyDefaultArgument
|
|
def __init__(self, url, headers={}, status_code=200):
|
|
self.url = url
|
|
self.headers = CaseInsensitiveDict(headers)
|
|
self.status_code = status_code
|
|
|
|
|
|
class TestDownloadUtils:
|
|
|
|
def test_Content_Range_parsing(self):
|
|
parse = parse_content_range
|
|
|
|
assert parse('bytes 100-199/200', 100) == 200
|
|
assert parse('bytes 100-199/*', 100) == 200
|
|
|
|
# single byte
|
|
assert parse('bytes 100-100/*', 100) == 101
|
|
|
|
# missing
|
|
pytest.raises(ContentRangeError, parse, None, 100)
|
|
|
|
# syntax error
|
|
pytest.raises(ContentRangeError, parse, 'beers 100-199/*', 100)
|
|
|
|
# unexpected range
|
|
pytest.raises(ContentRangeError, parse, 'bytes 100-199/*', 99)
|
|
|
|
# invalid instance-length
|
|
pytest.raises(ContentRangeError, parse, 'bytes 100-199/199', 100)
|
|
|
|
# invalid byte-range-resp-spec
|
|
pytest.raises(ContentRangeError, parse, 'bytes 100-99/199', 100)
|
|
|
|
@pytest.mark.parametrize('header, expected_filename', [
|
|
('attachment; filename=hello-WORLD_123.txt', 'hello-WORLD_123.txt'),
|
|
('attachment; filename=".hello-WORLD_123.txt"', 'hello-WORLD_123.txt'),
|
|
('attachment; filename="white space.txt"', 'white space.txt'),
|
|
(r'attachment; filename="\"quotes\".txt"', '"quotes".txt'),
|
|
('attachment; filename=/etc/hosts', 'hosts'),
|
|
('attachment; filename=', None)
|
|
])
|
|
def test_Content_Disposition_parsing(self, header, expected_filename):
|
|
assert filename_from_content_disposition(header) == expected_filename
|
|
|
|
def test_filename_from_url(self):
|
|
assert 'foo.txt' == filename_from_url(
|
|
url='http://example.org/foo',
|
|
content_type='text/plain'
|
|
)
|
|
assert 'foo.html' == filename_from_url(
|
|
url='http://example.org/foo',
|
|
content_type='text/html; charset=utf8'
|
|
)
|
|
assert 'foo' == filename_from_url(
|
|
url='http://example.org/foo',
|
|
content_type=None
|
|
)
|
|
assert 'foo' == filename_from_url(
|
|
url='http://example.org/foo',
|
|
content_type='x-foo/bar'
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
'orig_name, unique_on_attempt, expected',
|
|
[
|
|
# Simple
|
|
('foo.bar', 0, 'foo.bar'),
|
|
('foo.bar', 1, 'foo.bar-1'),
|
|
('foo.bar', 10, 'foo.bar-10'),
|
|
# Trim
|
|
('A' * 20, 0, 'A' * 10),
|
|
('A' * 20, 1, 'A' * 8 + '-1'),
|
|
('A' * 20, 10, 'A' * 7 + '-10'),
|
|
# Trim before ext
|
|
('A' * 20 + '.txt', 0, 'A' * 6 + '.txt'),
|
|
('A' * 20 + '.txt', 1, 'A' * 4 + '.txt-1'),
|
|
# Trim at the end
|
|
('foo.' + 'A' * 20, 0, 'foo.' + 'A' * 6),
|
|
('foo.' + 'A' * 20, 1, 'foo.' + 'A' * 4 + '-1'),
|
|
('foo.' + 'A' * 20, 10, 'foo.' + 'A' * 3 + '-10'),
|
|
]
|
|
)
|
|
@mock.patch('httpie.downloads.get_filename_max_length')
|
|
def test_unique_filename(self, get_filename_max_length,
|
|
orig_name, unique_on_attempt,
|
|
expected):
|
|
|
|
def attempts(unique_on_attempt=0):
|
|
# noinspection PyUnresolvedReferences,PyUnusedLocal
|
|
def exists(filename):
|
|
if exists.attempt == unique_on_attempt:
|
|
return False
|
|
exists.attempt += 1
|
|
return True
|
|
|
|
exists.attempt = 0
|
|
return exists
|
|
|
|
get_filename_max_length.return_value = 10
|
|
|
|
actual = get_unique_filename(orig_name, attempts(unique_on_attempt))
|
|
assert expected == actual
|
|
|
|
|
|
class TestDownloads:
|
|
# TODO: more tests
|
|
|
|
def test_actual_download(self, httpbin_both, httpbin):
|
|
robots_txt = '/robots.txt'
|
|
body = urlopen(httpbin + robots_txt).read().decode()
|
|
env = MockEnvironment(stdin_isatty=True, stdout_isatty=False)
|
|
r = http('--download', httpbin_both.url + robots_txt, env=env)
|
|
assert 'Downloading' in r.stderr
|
|
assert '[K' in r.stderr
|
|
assert 'Done' in r.stderr
|
|
assert body == r
|
|
|
|
def test_download_with_Content_Length(self, httpbin_both):
|
|
with open(os.devnull, 'w') as devnull:
|
|
downloader = Downloader(output_file=devnull, progress_file=devnull)
|
|
downloader.start(
|
|
initial_url='/',
|
|
final_response=Response(
|
|
url=httpbin_both.url + '/',
|
|
headers={'Content-Length': 10}
|
|
)
|
|
)
|
|
time.sleep(1.1)
|
|
downloader.chunk_downloaded(b'12345')
|
|
time.sleep(1.1)
|
|
downloader.chunk_downloaded(b'12345')
|
|
downloader.finish()
|
|
assert not downloader.interrupted
|
|
downloader._progress_reporter.join()
|
|
|
|
def test_download_no_Content_Length(self, httpbin_both):
|
|
with open(os.devnull, 'w') as devnull:
|
|
downloader = Downloader(output_file=devnull, progress_file=devnull)
|
|
downloader.start(
|
|
final_response=Response(url=httpbin_both.url + '/'),
|
|
initial_url='/'
|
|
)
|
|
time.sleep(1.1)
|
|
downloader.chunk_downloaded(b'12345')
|
|
downloader.finish()
|
|
assert not downloader.interrupted
|
|
downloader._progress_reporter.join()
|
|
|
|
def test_download_interrupted(self, httpbin_both):
|
|
with open(os.devnull, 'w') as devnull:
|
|
downloader = Downloader(output_file=devnull, progress_file=devnull)
|
|
downloader.start(
|
|
final_response=Response(
|
|
url=httpbin_both.url + '/',
|
|
headers={'Content-Length': 5}
|
|
),
|
|
initial_url='/'
|
|
)
|
|
downloader.chunk_downloaded(b'1234')
|
|
downloader.finish()
|
|
assert downloader.interrupted
|
|
downloader._progress_reporter.join()
|
|
|
|
def test_download_with_redirect_original_url_used_for_filename(self, httpbin):
|
|
# Redirect from `/redirect/1` to `/get`.
|
|
expected_filename = '1.json'
|
|
orig_cwd = os.getcwd()
|
|
with tempfile.TemporaryDirectory() as tmp_dirname:
|
|
os.chdir(tmp_dirname)
|
|
try:
|
|
assert os.listdir('.') == []
|
|
http('--download', httpbin.url + '/redirect/1')
|
|
assert os.listdir('.') == [expected_filename]
|
|
finally:
|
|
os.chdir(orig_cwd)
|