Skip to content

Commit bcfd6e7

Browse files
committed
mapreduce| Fixes -jN for map/reduce Checkers (e.g. SimilarChecker)
This integrate the map/reduce functionality into lint.check_process(). We previously had `map` being invoked, here we add `reduce` support. We do this by collecting the map-data by worker and then passing it to a reducer function on the Checker object, if available - determined by whether they confirm to the `mapreduce_checker.MapReduceMixin` mixin interface or nor. This allows Checker objects to function across file-streams when using multiprocessing/-j2+. For example SimilarChecker needs to be able to compare data across all files. The tests, that we also add here, check that a Checker instance returns and reports expected data and errors, such as error-messages and stats - at least in a exit-ok (0) situation. On a personal note, as we are copying more data across process boundaries, I suspect that the memory implications of this might cause issues for large projects already running with -jN and duplicate code detection on. That said, given that it takes a long time to perform lints of large code bases that is an issue for the [near?] future and likely to be part of the performance work. Either way but let's get it working first and deal with memory and perforamnce considerations later - I say this as there are many quick wins we can make here, e.g. file-batching, hashing lines, data compression and so on.
1 parent a7d4e47 commit bcfd6e7

File tree

6 files changed

+318
-33
lines changed

6 files changed

+318
-33
lines changed

pylint/checkers/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# Copyright (c) 2015 Ionel Cristian Maries <[email protected]>
88
# Copyright (c) 2016 Moises Lopez <[email protected]>
99
# Copyright (c) 2017-2018 Bryce Guinta <[email protected]>
10+
# Copyright (c) 2020 Frank Harrison <[email protected]>
1011

1112
# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
1213
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
@@ -40,6 +41,7 @@
4041
"""
4142

4243
from pylint.checkers.base_checker import BaseChecker, BaseTokenChecker
44+
from pylint.checkers.mapreduce_checker import MapReduceMixin
4345
from pylint.utils import register_plugins
4446

4547

pylint/checkers/mapreduce_checker.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Copyright (c) 2020 Frank Harrison <[email protected]>
2+
3+
# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
4+
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
5+
import abc
6+
7+
8+
class MapReduceMixin(metaclass=abc.ABCMeta):
9+
""" A mixin design to allow multiprocess/threaded runs of a Checker """
10+
11+
@abc.abstractmethod
12+
def get_map_data(self):
13+
""" Returns mergable/reducible data that will be examined """
14+
15+
@classmethod
16+
@abc.abstractmethod
17+
def reduce_map_data(cls, linter, data):
18+
""" For a given Checker, receives data for all mapped runs """

pylint/checkers/similar.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
import astroid
2727

28-
from pylint.checkers import BaseChecker, table_lines_from_stats
28+
from pylint.checkers import BaseChecker, MapReduceMixin, table_lines_from_stats
2929
from pylint.interfaces import IRawChecker
3030
from pylint.reporters.ureports.nodes import Table
3131
from pylint.utils import decoding_stream
@@ -292,7 +292,7 @@ def report_similarities(sect, stats, old_stats):
292292

293293

294294
# wrapper to get a pylint checker from the similar class
295-
class SimilarChecker(BaseChecker, Similar):
295+
class SimilarChecker(BaseChecker, Similar, MapReduceMixin):
296296
"""checks for similarities and duplicated code. This computation may be
297297
memory / CPU intensive, so you should disable it if you experiment some
298298
problems.

pylint/lint.py

+54-8
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
# Copyright (c) 2018 Yuval Langer <[email protected]>
4040
# Copyright (c) 2018 Nick Drozd <[email protected]>
4141
# Copyright (c) 2018 kapsh <[email protected]>
42+
# Copyright (c) 2020 Frank Harrison <[email protected]>
4243

4344
# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
4445
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
@@ -1280,14 +1281,18 @@ def _report_evaluation(self):
12801281

12811282

12821283
def check_parallel(linter, jobs, files, arguments=None):
1283-
"""Use the given linter to lint the files with given amount of workers (jobs)"""
1284-
# The reporter does not need to be passed to worker processess, i.e. the reporter does
1284+
"""Use the given linter to lint the files with given amount of workers (jobs)
1285+
1286+
This splits the work filestream-by-filestream. If you need to do work across
1287+
multiple files, as in the similarity-checker, then inherit from MapReduceMixin and
1288+
implement the map/reduce mixin functionality """
1289+
# The reporter does not need to be passed to worker processes, i.e. the reporter does
12851290
# not need to be pickleable
12861291
original_reporter = linter.reporter
12871292
linter.reporter = None
12881293

12891294
# The linter is inherited by all the pool's workers, i.e. the linter
1290-
# is identical to the linter object here. This is requred so that
1295+
# is identical to the linter object here. This is required so that
12911296
# a custom PyLinter object can be used.
12921297
initializer = functools.partial(_worker_initialize, arguments=arguments)
12931298
with multiprocessing.Pool(jobs, initializer=initializer, initargs=[linter]) as pool:
@@ -1298,19 +1303,29 @@ def check_parallel(linter, jobs, files, arguments=None):
12981303
linter.open()
12991304

13001305
all_stats = []
1301-
1302-
# Maps each file to be worked on by a single _worker_check_single_file() call
1303-
for module, messages, stats, msg_status in pool.imap_unordered(
1304-
_worker_check_single_file, files
1305-
):
1306+
all_mapreduce_data = collections.defaultdict(list)
1307+
1308+
# Maps each file to be worked on by a single _worker_check_single_file() call,
1309+
# collecting any map/reduce data by checker module so that we can 'reduce' it
1310+
# later.
1311+
for (
1312+
worker_idx, # used to merge map/reduce data across workers
1313+
module,
1314+
messages,
1315+
stats,
1316+
msg_status,
1317+
mapreduce_data,
1318+
) in pool.imap_unordered(_worker_check_single_file, files):
13061319
linter.set_current_module(module)
13071320
for msg in messages:
13081321
msg = Message(*msg)
13091322
linter.reporter.handle_message(msg)
13101323

13111324
all_stats.append(stats)
1325+
all_mapreduce_data[worker_idx].append(mapreduce_data)
13121326
linter.msg_status |= msg_status
13131327

1328+
_merge_mapreduce_data(linter, all_mapreduce_data)
13141329
linter.stats = _merge_stats(all_stats)
13151330

13161331
# Insert stats data to local checkers.
@@ -1343,15 +1358,46 @@ def _worker_check_single_file(file_item):
13431358
_worker_linter.open()
13441359
_worker_linter.check_single_file(name, filepath, modname)
13451360

1361+
mapreduce_data = collections.defaultdict(list)
1362+
for checker in _worker_linter.get_checkers():
1363+
try:
1364+
data = checker.get_map_data()
1365+
except AttributeError:
1366+
continue
1367+
mapreduce_data[checker.name].append(data)
1368+
13461369
msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages]
13471370
return (
1371+
id(multiprocessing.current_process()),
13481372
_worker_linter.current_name,
13491373
msgs,
13501374
_worker_linter.stats,
13511375
_worker_linter.msg_status,
1376+
mapreduce_data,
13521377
)
13531378

13541379

1380+
def _merge_mapreduce_data(linter, all_mapreduce_data):
1381+
""" Merges map/reduce data across workers, invoking relevant APIs on checkers """
1382+
# First collate the data, preparing it so we can send it to the checkers for
1383+
# validation. The intent here is to collect all the mapreduce data for all checker-
1384+
# runs across processes - that will then be passed to a static method on the
1385+
# checkers to be reduced and further processed.
1386+
collated_map_reduce_data = collections.defaultdict(list)
1387+
for linter_data in all_mapreduce_data.values():
1388+
for run_data in linter_data:
1389+
for checker_name, data in run_data.items():
1390+
collated_map_reduce_data[checker_name].extend(data)
1391+
1392+
# Send the data to checkers that support/require consolidated data
1393+
original_checkers = linter.get_checkers()
1394+
for checker in original_checkers:
1395+
if checker.name in collated_map_reduce_data:
1396+
# Assume that if the check has returned map/reduce data that it has the
1397+
# reducer function
1398+
checker.reduce_map_data(linter, collated_map_reduce_data[checker.name])
1399+
1400+
13551401
# some reporting functions ####################################################
13561402

13571403

0 commit comments

Comments
 (0)