Skip to content

Commit 48fadf2

Browse files
author
David Read
committed
[1418] Cached report refactor - Now use DataPipe to trigger QA off archiver. Minor fixes to QA task - now runs ok.
1 parent 34ec31b commit 48fadf2

File tree

5 files changed

+90
-62
lines changed

5 files changed

+90
-62
lines changed

ckanext/qa/model.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,11 @@ class QA(Base):
3737
updated = Column(types.DateTime)
3838

3939
def __repr__(self):
40-
if not self.error:
41-
summary = 'score=%s format=%s' % (self.openness_score, self.format)
42-
details = self.openness_score_reason
43-
else:
44-
summary = 'ERROR'
45-
details = self.error
40+
summary = 'score=%s format=%s' % (self.openness_score, self.format)
41+
details = self.openness_score_reason
4642
package = model.Package.get(self.package_id)
4743
package_name = package.name if package else '?%s?' % self.package_id
48-
return '<QA %s /dataset/%s/resource/%s%s>' % \
44+
return '<QA %s /dataset/%s/resource/%s %s>' % \
4945
(summary, package_name, self.resource_id, details)
5046

5147
def as_dict(self):

ckanext/qa/plugin.py

+17-16
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@
2020
import reports
2121
import logic
2222

23+
from ckanext.archiver.interfaces import IPipe
24+
2325
resource_dictize = model_dictize.resource_dictize
2426
send_task = celery_app.celery.send_task
2527

2628
log = logging.getLogger(__name__)
2729

30+
2831
class QAPlugin(p.SingletonPlugin):
2932
p.implements(p.IConfigurer, inherit=True)
3033
p.implements(p.IRoutes, inherit=True)
31-
p.implements(p.IDomainObjectModification, inherit=True)
32-
p.implements(p.IResourceUrlChange)
34+
p.implements(IPipe, inherit=True)
35+
#p.implements(p.IDomainObjectModification, inherit=True)
36+
#p.implements(p.IResourceUrlChange)
3337
p.implements(p.IActions)
3438
p.implements(p.IReportCache)
3539

@@ -92,22 +96,19 @@ def before_map(self, map):
9296

9397
return map
9498

95-
# IDomainObjectModification / IResourceUrlChange
99+
# IPipe
96100

97-
def notify(self, entity, operation=None):
98-
if not isinstance(entity, model.Resource):
101+
def receive_data(self, operation, **params):
102+
'''Receive notification from ckan-archiver that a resource has been archived.'''
103+
if not operation == 'archived':
99104
return
100-
resource = entity
101-
102-
if operation:
103-
if operation == model.DomainObjectOperation.new:
104-
# Resource created
105-
create_qa_update_task(resource, queue='priority')
106-
else:
107-
# Resource URL has changed.
108-
# If operation is None, resource URL has been changed because the
109-
# notify function in IResourceUrlChange only takes 1 parameter
110-
create_qa_update_task(resource, queue='priority')
105+
resource_id = params['resource_id']
106+
#cache_filepath = params['cached_filepath']
107+
108+
resource = model.Resource.get(resource_id)
109+
assert resource
110+
111+
create_qa_update_task(resource, queue='priority')
111112

112113
# IActions
113114

ckanext/qa/sniff_format.py

+14-12
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,6 @@ def get_zipped_format(filepath, log):
360360
log.info('Zip has no known extensions: %s', filepath)
361361
return Formats.by_display_name()['Zip']
362362

363-
print top_scoring_extension_counts.items()
364363
top_scoring_extension_counts = sorted(top_scoring_extension_counts.items(),
365364
key=lambda x: x[1])
366365
top_extension = top_scoring_extension_counts[-1][0]
@@ -372,17 +371,19 @@ def get_zipped_format(filepath, log):
372371
format_['container'] = Formats.by_display_name()['Zip']['display_name']
373372
log.info('Zipped file format detected: %s', format_['display_name'])
374373
return format_
375-
374+
375+
376376
def is_excel(filepath, log):
377377
try:
378-
book = xlrd.open_workbook(filepath)
378+
xlrd.open_workbook(filepath)
379379
except Exception, e:
380380
log.info('Not Excel - failed to load: %s %s', e, e.args)
381381
return False
382382
else:
383383
log.info('Excel file opened successfully')
384384
return True
385385

386+
386387
# same as the python 2.7 subprocess.check_output
387388
def check_output(*popenargs, **kwargs):
388389
if 'stdout' in kwargs:
@@ -425,7 +426,8 @@ def run_bsd_file(filepath, log):
425426
return format_
426427
log.info('"file" could not determine file format of "%s": %s',
427428
filepath, result)
428-
429+
430+
429431
def is_ttl(buf, log):
430432
'''If the buffer is a Turtle RDF file then return True.'''
431433
# Turtle spec: "Turtle documents may have the strings '@prefix' or '@base' (case dependent) near the beginning of the document."
@@ -465,12 +467,12 @@ def turtle_regex():
465467
does not support nested blank nodes, collection, sameas ('a' token)
466468
'''
467469
if not turtle_regex_:
468-
global turtle_regex_
469-
rdf_term = '(<[^ >]+>|_:\S+|".+?"(@\w+)?(\^\^\S+)?|\'.+?\'(@\w+)?(\^\^\S+)?|""".+?"""(@\w+)?(\^\^\S+)?|\'\'\'.+?\'\'\'(@\w+)?(\^\^\S+)?|[+-]?([0-9]+|[0-9]*\.[0-9]+)(E[+-]?[0-9]+)?|false|true)'
470-
471-
# simple case is: triple_re = '^T T T \.$'.replace('T', rdf_term)
472-
# but extend to deal with multiple predicate-objects:
473-
#triple = '^T T T\s*(;\s*T T\s*)*\.\s*$'.replace('T', rdf_term).replace(' ', '\s+')
474-
triple = '(^T|;)\s*T T\s*(;|\.\s*$)'.replace('T', rdf_term).replace(' ', '\s+')
475-
turtle_regex_ = re.compile(triple, re.MULTILINE)
470+
global turtle_regex_
471+
rdf_term = '(<[^ >]+>|_:\S+|".+?"(@\w+)?(\^\^\S+)?|\'.+?\'(@\w+)?(\^\^\S+)?|""".+?"""(@\w+)?(\^\^\S+)?|\'\'\'.+?\'\'\'(@\w+)?(\^\^\S+)?|[+-]?([0-9]+|[0-9]*\.[0-9]+)(E[+-]?[0-9]+)?|false|true)'
472+
473+
# simple case is: triple_re = '^T T T \.$'.replace('T', rdf_term)
474+
# but extend to deal with multiple predicate-objects:
475+
#triple = '^T T T\s*(;\s*T T\s*)*\.\s*$'.replace('T', rdf_term).replace(' ', '\s+')
476+
triple = '(^T|;)\s*T T\s*(;|\.\s*$)'.replace('T', rdf_term).replace(' ', '\s+')
477+
turtle_regex_ = re.compile(triple, re.MULTILINE)
476478
return turtle_regex_

ckanext/qa/tasks.py

+8-11
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import traceback
99

1010
import ckan.lib.celery_app as celery_app
11+
from ckan.lib.json import DateTimeJsonEncoder
1112
from ckanext.dgu.lib.formats import Formats
1213
from ckanext.qa.sniff_format import sniff_file_format
1314
from ckanext.archiver.model import Archival, Status
@@ -108,15 +109,15 @@ def update(ckan_ini_filepath, resource_id):
108109
raise QAError('Resource ID not found: %s' % resource_id)
109110
qa_result = resource_score(resource, log)
110111
log.info('Openness scoring: \n%r\n%r\n%r\n\n', qa_result, resource,
111-
resource['url'])
112+
resource.url)
112113
save_qa_result(resource.id, qa_result, log)
113114
log.info('CKAN updated with openness score')
114115
package = resource.resource_group.package if resource.resource_group else None
115116
if package:
116117
update_search_index(package.id, log)
117118
else:
118119
log.warning('Resource not connected to a package. Res: %r', resource)
119-
return json.dumps(qa_result)
120+
return json.dumps(qa_result, cls=DateTimeJsonEncoder)
120121
except Exception, e:
121122
log.error('Exception occurred during QA update: %s: %s',
122123
e.__class__.__name__, unicode(e))
@@ -132,7 +133,7 @@ def get_qa_format(resource_id):
132133
return q.format
133134

134135

135-
def resource_score(resource_id, log):
136+
def resource_score(resource, log):
136137
"""
137138
Score resource on Sir Tim Berners-Lee\'s five stars of openness.
138139
@@ -145,18 +146,15 @@ def resource_score(resource_id, log):
145146
146147
Raises QAError for reasonable errors
147148
"""
148-
from ckan import model
149-
150149
score = 0
151150
score_reason = ''
152151
format_ = None
153152

154153
try:
155154
score_reasons = [] # a list of strings detailing how we scored it
156-
archival = Archival.get_for_resource(resource_id=resource_id)
157-
resource = model.Resource.get(resource_id)
155+
archival = Archival.get_for_resource(resource_id=resource.id)
158156
if not resource:
159-
raise QAError('Could not find resource "%s"' % resource_id)
157+
raise QAError('Could not find resource "%s"' % resource.id)
160158

161159
score, format_ = score_if_link_broken(archival, resource, score_reasons, log)
162160
if score == None:
@@ -182,8 +180,7 @@ def resource_score(resource_id, log):
182180
except Exception, e:
183181
log.error('Unexpected error while calculating openness score %s: %s\nException: %s', e.__class__.__name__, unicode(e), traceback.format_exc())
184182
score_reason = "Unknown error: %s" % str(e)
185-
if os.environ.get('DEBUG'):
186-
raise
183+
raise
187184

188185
# Even if we can get the link, we should still treat the resource
189186
# as having a score of 0 if the license isn't open.
@@ -398,7 +395,7 @@ def save_qa_result(resource_id, qa_result, log):
398395
else:
399396
log.info('QA from before: %r', qa)
400397

401-
for key in ('openness_score', 'openness_reason', 'format'):
398+
for key in ('openness_score', 'openness_score_reason', 'format'):
402399
setattr(qa, key, qa_result[key])
403400
qa.archival_timestamp == qa_result['archival_timestamp']
404401
qa.updated = now

tests/test_tasks.py

+48-16
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
import requests
2-
import json
32
import logging
4-
import os
53
import urllib
64
import datetime
75

8-
from nose.tools import raises, assert_equal
6+
from nose.tools import assert_equal
97
from ckan import model
108
from ckan.tests import BaseCase
119
from ckan.logic import get_action
1210

1311
import ckanext.qa.tasks
1412
from ckanext.qa.tasks import resource_score, extension_variants
1513
import ckanext.archiver
14+
import ckanext.archiver.tasks
1615
from ckanext.dgu.lib.formats import Formats
1716
from ckanext.qa import model as qa_model
1817
from ckanext.archiver import model as archiver_model
@@ -43,6 +42,43 @@ def set_sniffed_format(format_display_name):
4342

4443
TODAY = datetime.datetime(year=2008, month=10, day=10)
4544

45+
class TestTask(BaseCase):
46+
47+
@classmethod
48+
def setup_class(cls):
49+
archiver_model.init_tables(model.meta.engine)
50+
qa_model.init_tables(model.meta.engine)
51+
52+
def teardown(self):
53+
model.repo.rebuild_db()
54+
55+
def test_trigger_on_archival(cls):
56+
# create package
57+
context = {'model': model, 'ignore_auth': True, 'session': model.Session, 'user': 'test'}
58+
pkg = {'name': 'testpkg', 'license_id': 'uk-ogl', 'resources': [
59+
{'url': 'http://test.com/', 'format': 'CSV', 'description': 'Test'}
60+
]}
61+
pkg = get_action('package_create')(context, pkg)
62+
resource_dict = pkg['resources'][0]
63+
res_id = resource_dict['id']
64+
# create record of archival
65+
archival = Archival.create(res_id)
66+
cache_filepath = __file__ # just needs to exist
67+
archival.cache_filepath = cache_filepath
68+
archival.updated = TODAY
69+
model.Session.add(archival)
70+
model.Session.commit()
71+
# TODO show that QA hasn't run yet
72+
73+
# create a send_data from ckanext-archiver, that gets picked up by
74+
# ckanext-qa to put a task on the queue
75+
ckanext.archiver.tasks.notify(resource_dict, cache_filepath)
76+
# this is useful on its own (without any asserts) because it checks
77+
# there are no exceptions when running it
78+
79+
# TODO run celery and check it actually ran...
80+
81+
4682
class TestResourceScore(BaseCase):
4783

4884
@classmethod
@@ -60,11 +96,7 @@ def setup_class(cls):
6096
}
6197

6298
def teardown(self):
63-
pkg = model.Package.get(u'testpkg')
64-
if pkg:
65-
model.repo.new_revision()
66-
pkg.purge()
67-
model.repo.commit_and_remove()
99+
model.repo.rebuild_db()
68100

69101
def _test_resource(self, url='anything', format='TXT', archived=True, cached=True, license_id='uk-ogl'):
70102
context = {'model': model, 'ignore_auth': True, 'session': model.Session, 'user': 'test'}
@@ -79,7 +111,7 @@ def _test_resource(self, url='anything', format='TXT', archived=True, cached=Tru
79111
archival.updated = TODAY
80112
model.Session.add(archival)
81113
model.Session.commit()
82-
return res_id
114+
return model.Resource.get(res_id)
83115

84116
@classmethod
85117
def _set_task_status(cls, task_type, task_status_str):
@@ -173,16 +205,16 @@ def test_available_but_not_open(self):
173205
assert 'License not open' in result['openness_score_reason'], result
174206

175207
def test_not_available_and_not_open(self):
176-
res_id = self._test_resource(license_id=None, format=None, cached=False)
177-
archival = Archival.get_for_resource(res_id)
208+
res = self._test_resource(license_id=None, format=None, cached=False)
209+
archival = Archival.get_for_resource(res.id)
178210
archival.status_id = Status.by_text('Download error')
179211
archival.reason = 'Server returned 500 error'
180212
archival.last_success = None
181213
archival.first_failure = datetime.datetime(year=2008, month=10, day=1, hour=6, minute=30)
182214
archival.failure_count = 16
183215
archival.is_broken = True
184216
model.Session.commit()
185-
result = resource_score(res_id, log)
217+
result = resource_score(res, log)
186218
assert result['openness_score'] == 0, result
187219
assert_equal(result['format'], None)
188220
# in preference it should report that it is not available
@@ -192,22 +224,22 @@ def test_not_available_any_more(self):
192224
# A cache of the data still exists from the previous run, but this
193225
# time, the archiver found the file gave a 404.
194226
# The record of the previous (successful) run of QA.
195-
res_id = self._test_resource(license_id=None, format=None)
196-
qa = qa_model.QA.create(res_id)
227+
res = self._test_resource(license_id=None, format=None)
228+
qa = qa_model.QA.create(res.id)
197229
qa.format = 'CSV'
198230
model.Session.add(qa)
199231
model.Session.commit()
200232
# cache still exists from the previous run, but this time, the archiver
201233
# found the file gave a 404.
202-
archival = Archival.get_for_resource(res_id)
234+
archival = Archival.get_for_resource(res.id)
203235
archival.cache_filepath = __file__
204236
archival.status_id = Status.by_text('Download error')
205237
archival.reason = 'Server returned 404 error'
206238
archival.last_success = datetime.datetime(year=2008, month=10, day=1)
207239
archival.first_failure = datetime.datetime(year=2008, month=10, day=2)
208240
archival.failure_count = 1
209241
archival.is_broken = True
210-
result = resource_score(res_id, log)
242+
result = resource_score(res, log)
211243
assert result['openness_score'] == 0, result
212244
assert_equal(result['format'], 'CSV')
213245
# in preference it should report that it is not available

0 commit comments

Comments
 (0)