forked from redis/redis-py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_ssl.py
242 lines (208 loc) · 8.22 KB
/
test_ssl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import socket
import ssl
from urllib.parse import urlparse
import pytest
import redis
from redis.exceptions import ConnectionError, RedisError
from .conftest import skip_if_cryptography, skip_if_nocryptography
from .ssl_utils import get_ssl_filename
@pytest.mark.ssl
class TestSSL:
"""Tests for SSL connections
This relies on the --redis-ssl-url purely for rebuilding the client
and connecting to the appropriate port.
"""
SERVER_CERT = get_ssl_filename("server-cert.pem")
SERVER_KEY = get_ssl_filename("server-key.pem")
def test_ssl_with_invalid_cert(self, request):
ssl_url = request.config.option.redis_ssl_url
sslclient = redis.from_url(ssl_url)
with pytest.raises(ConnectionError) as e:
sslclient.ping()
assert "SSL: CERTIFICATE_VERIFY_FAILED" in str(e)
sslclient.close()
def test_ssl_connection(self, request):
ssl_url = request.config.option.redis_ssl_url
p = urlparse(ssl_url)[1].split(":")
r = redis.Redis(host=p[0], port=p[1], ssl=True, ssl_cert_reqs="none")
assert r.ping()
r.close()
def test_ssl_connection_without_ssl(self, request):
ssl_url = request.config.option.redis_ssl_url
p = urlparse(ssl_url)[1].split(":")
r = redis.Redis(host=p[0], port=p[1], ssl=False)
with pytest.raises(ConnectionError) as e:
r.ping()
assert "Connection closed by server" in str(e)
r.close()
def test_validating_self_signed_certificate(self, request):
ssl_url = request.config.option.redis_ssl_url
p = urlparse(ssl_url)[1].split(":")
r = redis.Redis(
host=p[0],
port=p[1],
ssl=True,
ssl_certfile=self.SERVER_CERT,
ssl_keyfile=self.SERVER_KEY,
ssl_cert_reqs="required",
ssl_ca_certs=self.SERVER_CERT,
)
assert r.ping()
r.close()
def test_validating_self_signed_string_certificate(self, request):
with open(self.SERVER_CERT) as f:
cert_data = f.read()
ssl_url = request.config.option.redis_ssl_url
p = urlparse(ssl_url)[1].split(":")
r = redis.Redis(
host=p[0],
port=p[1],
ssl=True,
ssl_certfile=self.SERVER_CERT,
ssl_keyfile=self.SERVER_KEY,
ssl_cert_reqs="required",
ssl_ca_data=cert_data,
)
assert r.ping()
r.close()
def _create_oscp_conn(self, request):
ssl_url = request.config.option.redis_ssl_url
p = urlparse(ssl_url)[1].split(":")
r = redis.Redis(
host=p[0],
port=p[1],
ssl=True,
ssl_certfile=self.SERVER_CERT,
ssl_keyfile=self.SERVER_KEY,
ssl_cert_reqs="required",
ssl_ca_certs=self.SERVER_CERT,
ssl_validate_ocsp=True,
)
return r
@skip_if_cryptography()
def test_ssl_ocsp_called(self, request):
r = self._create_oscp_conn(request)
with pytest.raises(RedisError) as e:
r.ping()
assert "cryptography is not installed" in str(e)
r.close()
@skip_if_nocryptography()
def test_ssl_ocsp_called_withcrypto(self, request):
r = self._create_oscp_conn(request)
with pytest.raises(ConnectionError) as e:
assert r.ping()
assert "No AIA information present in ssl certificate" in str(e)
r.close()
# rediss://, url based
ssl_url = request.config.option.redis_ssl_url
sslclient = redis.from_url(ssl_url)
with pytest.raises(ConnectionError) as e:
sslclient.ping()
assert "No AIA information present in ssl certificate" in str(e)
sslclient.close()
@skip_if_nocryptography()
def test_valid_ocsp_cert_http(self):
from redis.ocsp import OCSPVerifier
hostnames = ["github.com", "aws.amazon.com", "ynet.co.il"]
for hostname in hostnames:
context = ssl.create_default_context()
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as wrapped:
ocsp = OCSPVerifier(wrapped, hostname, 443)
assert ocsp.is_valid()
@skip_if_nocryptography()
def test_revoked_ocsp_certificate(self):
from redis.ocsp import OCSPVerifier
context = ssl.create_default_context()
hostname = "revoked.badssl.com"
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as wrapped:
ocsp = OCSPVerifier(wrapped, hostname, 443)
with pytest.raises(ConnectionError) as e:
assert ocsp.is_valid()
assert "REVOKED" in str(e)
@skip_if_nocryptography()
def test_unauthorized_ocsp(self):
from redis.ocsp import OCSPVerifier
context = ssl.create_default_context()
hostname = "stackoverflow.com"
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as wrapped:
ocsp = OCSPVerifier(wrapped, hostname, 443)
with pytest.raises(ConnectionError):
ocsp.is_valid()
@skip_if_nocryptography()
def test_ocsp_not_present_in_response(self):
from redis.ocsp import OCSPVerifier
context = ssl.create_default_context()
hostname = "google.co.il"
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as wrapped:
ocsp = OCSPVerifier(wrapped, hostname, 443)
with pytest.raises(ConnectionError) as e:
assert ocsp.is_valid()
assert "from the" in str(e)
@skip_if_nocryptography()
def test_unauthorized_then_direct(self):
from redis.ocsp import OCSPVerifier
# these certificates on the socket end return unauthorized
# then the second call succeeds
hostnames = ["wikipedia.org", "squarespace.com"]
for hostname in hostnames:
context = ssl.create_default_context()
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as wrapped:
ocsp = OCSPVerifier(wrapped, hostname, 443)
assert ocsp.is_valid()
@skip_if_nocryptography()
def test_mock_ocsp_staple(self, request):
import OpenSSL
ssl_url = request.config.option.redis_ssl_url
p = urlparse(ssl_url)[1].split(":")
r = redis.Redis(
host=p[0],
port=p[1],
ssl=True,
ssl_certfile=self.SERVER_CERT,
ssl_keyfile=self.SERVER_KEY,
ssl_cert_reqs="required",
ssl_ca_certs=self.SERVER_CERT,
ssl_validate_ocsp=True,
ssl_ocsp_context=p, # just needs to not be none
)
with pytest.raises(RedisError):
r.ping()
r.close()
ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
ctx.use_certificate_file(self.SERVER_CERT)
ctx.use_privatekey_file(self.SERVER_KEY)
r = redis.Redis(
host=p[0],
port=p[1],
ssl=True,
ssl_certfile=self.SERVER_CERT,
ssl_keyfile=self.SERVER_KEY,
ssl_cert_reqs="required",
ssl_ca_certs=self.SERVER_CERT,
ssl_ocsp_context=ctx,
ssl_ocsp_expected_cert=open(self.SERVER_KEY, "rb").read(),
ssl_validate_ocsp_stapled=True,
)
with pytest.raises(ConnectionError) as e:
r.ping()
assert "no ocsp response present" in str(e)
r.close()
r = redis.Redis(
host=p[0],
port=p[1],
ssl=True,
ssl_certfile=self.SERVER_CERT,
ssl_keyfile=self.SERVER_KEY,
ssl_cert_reqs="required",
ssl_ca_certs=self.SERVER_CERT,
ssl_validate_ocsp_stapled=True,
)
with pytest.raises(ConnectionError) as e:
r.ping()
assert "no ocsp response present" in str(e)
r.close()