|
10 | 10 | # See the License for the specific language governing permissions and
|
11 | 11 | # limitations under the License.
|
12 | 12 |
|
| 13 | +import csv |
| 14 | +import io |
13 | 15 | import uuid
|
14 | 16 |
|
15 | 17 | import pretend
|
16 | 18 | import pytest
|
17 | 19 |
|
18 |
| -from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound |
| 20 | +from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPSeeOther |
19 | 21 |
|
20 | 22 | from warehouse.admin.views import emails as views
|
21 | 23 |
|
| 24 | +from ....common.db.accounts import EmailFactory, UserFactory |
22 | 25 | from ....common.db.ses import EmailMessageFactory
|
23 | 26 |
|
24 | 27 |
|
@@ -73,6 +76,128 @@ def test_wildcard_query(self, db_request):
|
73 | 76 | assert result == {"emails": [emails[0]], "query": emails[0].to[:-1] + "%"}
|
74 | 77 |
|
75 | 78 |
|
| 79 | +class TestEmailMass: |
| 80 | + def test_sends_emails(self, db_request): |
| 81 | + user1 = UserFactory.create() |
| 82 | + email1 = EmailFactory.create(user=user1, primary=True) |
| 83 | + user2 = UserFactory.create() |
| 84 | + email2 = EmailFactory.create(user=user2, primary=True) |
| 85 | + |
| 86 | + input_file = io.BytesIO() |
| 87 | + wrapper = io.TextIOWrapper(input_file, encoding="utf-8") |
| 88 | + fieldnames = ["user_id", "subject", "body_text"] |
| 89 | + writer = csv.DictWriter(wrapper, fieldnames=fieldnames) |
| 90 | + writer.writeheader() |
| 91 | + writer.writerow( |
| 92 | + { |
| 93 | + "user_id": user1.id, |
| 94 | + "subject": "Test Subject 1", |
| 95 | + "body_text": "Test Body 1", |
| 96 | + } |
| 97 | + ) |
| 98 | + writer.writerow( |
| 99 | + { |
| 100 | + "user_id": user2.id, |
| 101 | + "subject": "Test Subject 2", |
| 102 | + "body_text": "Test Body 2", |
| 103 | + } |
| 104 | + ) |
| 105 | + wrapper.seek(0) |
| 106 | + |
| 107 | + delay = pretend.call_recorder(lambda *a, **kw: None) |
| 108 | + db_request.params = {"csvfile": pretend.stub(file=input_file)} |
| 109 | + db_request.task = lambda a: pretend.stub(delay=delay) |
| 110 | + db_request.route_path = pretend.call_recorder(lambda *a, **kw: "/the-redirect") |
| 111 | + db_request.session = pretend.stub( |
| 112 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 113 | + ) |
| 114 | + |
| 115 | + result = views.email_mass(db_request) |
| 116 | + |
| 117 | + assert isinstance(result, HTTPSeeOther) |
| 118 | + assert db_request.route_path.calls == [pretend.call("admin.emails.list")] |
| 119 | + assert result.headers["Location"] == "/the-redirect" |
| 120 | + assert db_request.session.flash.calls == [ |
| 121 | + pretend.call("Mass emails sent", queue="success") |
| 122 | + ] |
| 123 | + assert delay.calls == [ |
| 124 | + pretend.call( |
| 125 | + email1.email, |
| 126 | + { |
| 127 | + "subject": "Test Subject 1", |
| 128 | + "body_text": "Test Body 1", |
| 129 | + "body_html": None, |
| 130 | + }, |
| 131 | + ), |
| 132 | + pretend.call( |
| 133 | + email2.email, |
| 134 | + { |
| 135 | + "subject": "Test Subject 2", |
| 136 | + "body_text": "Test Body 2", |
| 137 | + "body_html": None, |
| 138 | + }, |
| 139 | + ), |
| 140 | + ] |
| 141 | + |
| 142 | + def test_user_without_email_sends_no_emails(self, db_request): |
| 143 | + user = UserFactory.create() |
| 144 | + |
| 145 | + input_file = io.BytesIO() |
| 146 | + wrapper = io.TextIOWrapper(input_file, encoding="utf-8") |
| 147 | + fieldnames = ["user_id", "subject", "body_text"] |
| 148 | + writer = csv.DictWriter(wrapper, fieldnames=fieldnames) |
| 149 | + writer.writeheader() |
| 150 | + writer.writerow( |
| 151 | + { |
| 152 | + "user_id": user.id, |
| 153 | + "subject": "Test Subject 1", |
| 154 | + "body_text": "Test Body 1", |
| 155 | + } |
| 156 | + ) |
| 157 | + wrapper.seek(0) |
| 158 | + |
| 159 | + delay = pretend.call_recorder(lambda *a, **kw: None) |
| 160 | + db_request.params = {"csvfile": pretend.stub(file=input_file)} |
| 161 | + db_request.task = lambda a: pretend.stub(delay=delay) |
| 162 | + db_request.route_path = pretend.call_recorder(lambda *a, **kw: "/the-redirect") |
| 163 | + db_request.session = pretend.stub( |
| 164 | + flash=pretend.call_recorder(lambda *a, **kw: None) |
| 165 | + ) |
| 166 | + |
| 167 | + result = views.email_mass(db_request) |
| 168 | + |
| 169 | + assert isinstance(result, HTTPSeeOther) |
| 170 | + assert db_request.route_path.calls == [pretend.call("admin.emails.list")] |
| 171 | + assert result.headers["Location"] == "/the-redirect" |
| 172 | + assert delay.calls == [] |
| 173 | + |
| 174 | + def test_no_rows_sends_no_emails(self): |
| 175 | + input_file = io.BytesIO() |
| 176 | + wrapper = io.TextIOWrapper(input_file, encoding="utf-8") |
| 177 | + fieldnames = ["user_id", "subject", "body_text"] |
| 178 | + writer = csv.DictWriter(wrapper, fieldnames=fieldnames) |
| 179 | + writer.writeheader() |
| 180 | + wrapper.seek(0) |
| 181 | + |
| 182 | + delay = pretend.call_recorder(lambda *a, **kw: None) |
| 183 | + request = pretend.stub( |
| 184 | + params={"csvfile": pretend.stub(file=input_file)}, |
| 185 | + task=lambda a: pretend.stub(delay=delay), |
| 186 | + route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"), |
| 187 | + session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)), |
| 188 | + ) |
| 189 | + |
| 190 | + result = views.email_mass(request) |
| 191 | + |
| 192 | + assert isinstance(result, HTTPSeeOther) |
| 193 | + assert request.route_path.calls == [pretend.call("admin.emails.list")] |
| 194 | + assert result.headers["Location"] == "/the-redirect" |
| 195 | + assert request.session.flash.calls == [ |
| 196 | + pretend.call("No emails to send", queue="error") |
| 197 | + ] |
| 198 | + assert delay.calls == [] |
| 199 | + |
| 200 | + |
76 | 201 | class TestEmailDetail:
|
77 | 202 | def test_existing_email(self, db_session):
|
78 | 203 | em = EmailMessageFactory.create()
|
|
0 commit comments