Skip to content

Commit b41e8d9

Browse files
doublethefishPierre-Sassoulas
authored andcommitted
mapreduce| Fixes -jN for map/reduce Checkers (e.g. SimilarChecker)
This integrate the map/reduce functionality into lint.check_process(). We previously had `map` being invoked, here we add `reduce` support. We do this by collecting the map-data by worker and then passing it to a reducer function on the Checker object, if available - determined by whether they confirm to the `mapreduce_checker.MapReduceMixin` mixin interface or nor. This allows Checker objects to function across file-streams when using multiprocessing/-j2+. For example SimilarChecker needs to be able to compare data across all files. The tests, that we also add here, check that a Checker instance returns and reports expected data and errors, such as error-messages and stats - at least in a exit-ok (0) situation. On a personal note, as we are copying more data across process boundaries, I suspect that the memory implications of this might cause issues for large projects already running with -jN and duplicate code detection on. That said, given that it takes a long time to perform lints of large code bases that is an issue for the [near?] future and likely to be part of the performance work. Either way but let's get it working first and deal with memory and perforamnce considerations later - I say this as there are many quick wins we can make here, e.g. file-batching, hashing lines, data compression and so on.
1 parent 579b58d commit b41e8d9

File tree

6 files changed

+105
-21
lines changed

6 files changed

+105
-21
lines changed

pylint/checkers/__init__.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# Copyright (c) 2018-2019 Pierre Sassoulas <[email protected]>
1111
# Copyright (c) 2018 ssolanki <[email protected]>
1212
# Copyright (c) 2019 Bruno P. Kinoshita <[email protected]>
13+
# Copyright (c) 2020 Frank Harrison <[email protected]>
1314

1415
# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
1516
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
@@ -43,6 +44,7 @@
4344
"""
4445

4546
from pylint.checkers.base_checker import BaseChecker, BaseTokenChecker
47+
from pylint.checkers.mapreduce_checker import MapReduceMixin
4648
from pylint.utils import register_plugins
4749

4850

@@ -64,4 +66,10 @@ def initialize(linter):
6466
register_plugins(linter, __path__[0])
6567

6668

67-
__all__ = ["BaseChecker", "BaseTokenChecker", "initialize", "register_plugins"]
69+
__all__ = [
70+
"BaseChecker",
71+
"BaseTokenChecker",
72+
"initialize",
73+
"MapReduceMixin",
74+
"register_plugins",
75+
]

pylint/checkers/mapreduce_checker.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Copyright (c) 2020 Frank Harrison <[email protected]>
2+
3+
# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
4+
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
5+
import abc
6+
7+
8+
class MapReduceMixin(metaclass=abc.ABCMeta):
9+
""" A mixin design to allow multiprocess/threaded runs of a Checker """
10+
11+
@abc.abstractmethod
12+
def get_map_data(self):
13+
""" Returns mergable/reducible data that will be examined """
14+
15+
@classmethod
16+
@abc.abstractmethod
17+
def reduce_map_data(cls, linter, data):
18+
""" For a given Checker, receives data for all mapped runs """

pylint/checkers/similar.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
import astroid
3333

34-
from pylint.checkers import BaseChecker, table_lines_from_stats
34+
from pylint.checkers import BaseChecker, MapReduceMixin, table_lines_from_stats
3535
from pylint.interfaces import IRawChecker
3636
from pylint.reporters.ureports.nodes import Table
3737
from pylint.utils import decoding_stream
@@ -302,7 +302,7 @@ def report_similarities(sect, stats, old_stats):
302302

303303

304304
# wrapper to get a pylint checker from the similar class
305-
class SimilarChecker(BaseChecker, Similar):
305+
class SimilarChecker(BaseChecker, Similar, MapReduceMixin):
306306
"""checks for similarities and duplicated code. This computation may be
307307
memory / CPU intensive, so you should disable it if you experiment some
308308
problems.

pylint/lint/parallel.py

+46-9
Original file line numberDiff line numberDiff line change
@@ -67,28 +67,59 @@ def _worker_check_single_file(file_item):
6767

6868
_worker_linter.open()
6969
_worker_linter.check_single_file(name, filepath, modname)
70-
70+
mapreduce_data = collections.defaultdict(list)
71+
for checker in _worker_linter.get_checkers():
72+
try:
73+
data = checker.get_map_data()
74+
except AttributeError:
75+
continue
76+
mapreduce_data[checker.name].append(data)
7177
msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages]
7278
_worker_linter.reporter.reset()
7379
return (
80+
id(multiprocessing.current_process()),
7481
_worker_linter.current_name,
7582
filepath,
7683
_worker_linter.file_state.base_name,
7784
msgs,
7885
_worker_linter.stats,
7986
_worker_linter.msg_status,
87+
mapreduce_data,
8088
)
8189

8290

91+
def _merge_mapreduce_data(linter, all_mapreduce_data):
92+
""" Merges map/reduce data across workers, invoking relevant APIs on checkers """
93+
# First collate the data, preparing it so we can send it to the checkers for
94+
# validation. The intent here is to collect all the mapreduce data for all checker-
95+
# runs across processes - that will then be passed to a static method on the
96+
# checkers to be reduced and further processed.
97+
collated_map_reduce_data = collections.defaultdict(list)
98+
for linter_data in all_mapreduce_data.values():
99+
for run_data in linter_data:
100+
for checker_name, data in run_data.items():
101+
collated_map_reduce_data[checker_name].extend(data)
102+
103+
# Send the data to checkers that support/require consolidated data
104+
original_checkers = linter.get_checkers()
105+
for checker in original_checkers:
106+
if checker.name in collated_map_reduce_data:
107+
# Assume that if the check has returned map/reduce data that it has the
108+
# reducer function
109+
checker.reduce_map_data(linter, collated_map_reduce_data[checker.name])
110+
111+
83112
def check_parallel(linter, jobs, files, arguments=None):
84-
"""Use the given linter to lint the files with given amount of workers (jobs)"""
85-
# The reporter does not need to be passed to worker processess, i.e. the reporter does
86-
# not need to be pickleable
113+
"""Use the given linter to lint the files with given amount of workers (jobs)
114+
This splits the work filestream-by-filestream. If you need to do work across
115+
multiple files, as in the similarity-checker, then inherit from MapReduceMixin and
116+
implement the map/reduce mixin functionality"""
117+
# The reporter does not need to be passed to worker processes, i.e. the reporter does
87118
original_reporter = linter.reporter
88119
linter.reporter = None
89120

90121
# The linter is inherited by all the pool's workers, i.e. the linter
91-
# is identical to the linter object here. This is requred so that
122+
# is identical to the linter object here. This is required so that
92123
# a custom PyLinter object can be used.
93124
initializer = functools.partial(_worker_initialize, arguments=arguments)
94125
pool = multiprocessing.Pool(jobs, initializer=initializer, initargs=[linter])
@@ -97,30 +128,36 @@ def check_parallel(linter, jobs, files, arguments=None):
97128
# correct reporter
98129
linter.set_reporter(original_reporter)
99130
linter.open()
100-
101-
all_stats = []
102-
103131
try:
132+
all_stats = []
133+
all_mapreduce_data = collections.defaultdict(list)
134+
135+
# Maps each file to be worked on by a single _worker_check_single_file() call,
136+
# collecting any map/reduce data by checker module so that we can 'reduce' it
137+
# later.
104138
for (
139+
worker_idx, # used to merge map/reduce data across workers
105140
module,
106141
file_path,
107142
base_name,
108143
messages,
109144
stats,
110145
msg_status,
146+
mapreduce_data,
111147
) in pool.imap_unordered(_worker_check_single_file, files):
112148
linter.file_state.base_name = base_name
113149
linter.set_current_module(module, file_path)
114150
for msg in messages:
115151
msg = Message(*msg)
116152
linter.reporter.handle_message(msg)
117-
118153
all_stats.append(stats)
154+
all_mapreduce_data[worker_idx].append(mapreduce_data)
119155
linter.msg_status |= msg_status
120156
finally:
121157
pool.close()
122158
pool.join()
123159

160+
_merge_mapreduce_data(linter, all_mapreduce_data)
124161
linter.stats = _merge_stats(all_stats)
125162

126163
# Insert stats data to local checkers.

tests/test_check_parallel.py

+21-6
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,17 @@ def test_worker_check_single_file_uninitialised(self):
103103
def test_worker_check_single_file_no_checkers(self):
104104
linter = PyLinter(reporter=Reporter())
105105
worker_initialize(linter=linter)
106-
(name, _, _, msgs, stats, msg_status) = worker_check_single_file(
107-
_gen_file_data()
108-
)
106+
107+
(
108+
_, # proc-id
109+
name,
110+
_, # file_path
111+
_, # base_name
112+
msgs,
113+
stats,
114+
msg_status,
115+
_, # mapreduce_data
116+
) = worker_check_single_file(_gen_file_data())
109117
assert name == "--test-file_data-name-0--"
110118
assert [] == msgs
111119
no_errors_status = 0
@@ -140,9 +148,16 @@ def test_worker_check_sequential_checker(self):
140148
# Add the only checker we care about in this test
141149
linter.register_checker(SequentialTestChecker(linter))
142150

143-
(name, _, _, msgs, stats, msg_status) = worker_check_single_file(
144-
_gen_file_data()
145-
)
151+
(
152+
_, # proc-id
153+
name,
154+
_, # file_path
155+
_, # base_name
156+
msgs,
157+
stats,
158+
msg_status,
159+
_, # mapreduce_data
160+
) = worker_check_single_file(_gen_file_data())
146161

147162
# Ensure we return the same data as the single_file_no_checkers test
148163
assert name == "--test-file_data-name-0--"

tests/test_self.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646

4747
import pytest
4848

49-
from pylint.constants import MAIN_CHECKER_NAME
49+
from pylint.constants import MAIN_CHECKER_NAME, MSG_TYPES_STATUS
5050
from pylint.lint import Run
5151
from pylint.reporters import JSONReporter
5252
from pylint.reporters.text import BaseReporter, ColorizedTextReporter, TextReporter
@@ -243,13 +243,19 @@ def test_no_out_encoding(self):
243243
)
244244

245245
def test_parallel_execution(self):
246+
out = StringIO()
246247
self._runtest(
247248
[
248249
"-j 2",
249250
join(HERE, "functional", "a", "arguments.py"),
250-
join(HERE, "functional", "a", "arguments.py"),
251251
],
252-
code=2,
252+
out=out,
253+
# We expect similarities to fail and an error
254+
code=MSG_TYPES_STATUS["E"],
255+
)
256+
assert (
257+
"Unexpected keyword argument 'fourth' in function call"
258+
in out.getvalue().strip()
253259
)
254260

255261
def test_parallel_execution_missing_arguments(self):

0 commit comments

Comments
 (0)