|
1 | 1 | from __future__ import absolute_import
|
2 | 2 |
|
3 | 3 | import functools
|
| 4 | +import logging |
4 | 5 | import posixpath
|
5 | 6 | import six
|
6 | 7 |
|
7 | 8 | from threading import Lock
|
8 | 9 |
|
9 | 10 | import rb
|
| 11 | +import rediscluster |
10 | 12 | from pkg_resources import resource_string
|
11 | 13 | from redis.client import Script
|
12 | 14 | from redis.connection import ConnectionPool
|
|
17 | 19 | from sentry.utils.warnings import DeprecatedSettingWarning
|
18 | 20 | from sentry.utils.versioning import Version, check_versions
|
19 | 21 |
|
| 22 | +logger = logging.getLogger(__name__) |
| 23 | + |
20 | 24 | _pool_cache = {}
|
21 | 25 | _pool_lock = Lock()
|
22 | 26 |
|
@@ -53,27 +57,76 @@ def make_rb_cluster(*args, **kwargs):
|
53 | 57 | return _make_rb_cluster(*args, **kwargs)
|
54 | 58 |
|
55 | 59 |
|
| 60 | +class _RBCluster(object): |
| 61 | + def supports(self, config): |
| 62 | + return not config.get('is_redis_cluster', False) |
| 63 | + |
| 64 | + def factory(self, **config): |
| 65 | + # rb expects a dict of { host, port } dicts where the key is the host |
| 66 | + # ID. Coerce the configuration into the correct format if necessary. |
| 67 | + hosts = config['hosts'] |
| 68 | + hosts = {k: v for k, v in enumerate(hosts)} if isinstance(hosts, list) else hosts |
| 69 | + config['hosts'] = hosts |
| 70 | + |
| 71 | + return _make_rb_cluster(**config) |
| 72 | + |
| 73 | + def __str__(self): |
| 74 | + return 'Redis Blaster Cluster' |
| 75 | + |
| 76 | + |
| 77 | +class _RedisCluster(object): |
| 78 | + def supports(self, config): |
| 79 | + return config.get('is_redis_cluster', False) |
| 80 | + |
| 81 | + def factory(self, **config): |
| 82 | + # StrictRedisCluster expects a list of { host, port } dicts. Coerce the |
| 83 | + # configuration into the correct format if necessary. |
| 84 | + hosts = config.get('hosts') |
| 85 | + hosts = hosts.values() if isinstance(hosts, dict) else hosts |
| 86 | + |
| 87 | + # Redis cluster does not wait to attempt to connect, we don't want the |
| 88 | + # application to fail to boot because of this, raise a KeyError |
| 89 | + try: |
| 90 | + return rediscluster.StrictRedisCluster(startup_nodes=hosts, decode_responses=True) |
| 91 | + except rediscluster.exceptions.RedisClusterException: |
| 92 | + logger.warning('Failed to connect to Redis Cluster', exc_info=True) |
| 93 | + raise KeyError('Redis Cluster could not be initalized') |
| 94 | + |
| 95 | + def __str__(self): |
| 96 | + return 'Redis Cluster' |
| 97 | + |
| 98 | + |
56 | 99 | class ClusterManager(object):
|
57 |
| - def __init__(self, options_manager): |
| 100 | + def __init__(self, options_manager, cluster_type=_RBCluster): |
58 | 101 | self.__clusters = {}
|
59 | 102 | self.__options_manager = options_manager
|
| 103 | + self.__cluster_type = cluster_type() |
60 | 104 |
|
61 | 105 | def get(self, key):
|
62 | 106 | cluster = self.__clusters.get(key)
|
63 | 107 |
|
64 |
| - if cluster is None: |
65 |
| - # TODO: This would probably be safer with a lock, but I'm not sure |
66 |
| - # that it's necessary. |
67 |
| - configuration = self.__options_manager.get('redis.clusters').get(key) |
68 |
| - if configuration is None: |
69 |
| - raise KeyError('Invalid cluster name: {}'.format(key)) |
| 108 | + if cluster: |
| 109 | + return cluster |
| 110 | + |
| 111 | + # TODO: This would probably be safer with a lock, but I'm not sure |
| 112 | + # that it's necessary. |
| 113 | + configuration = self.__options_manager.get('redis.clusters').get(key) |
| 114 | + if configuration is None: |
| 115 | + raise KeyError('Invalid cluster name: {}'.format(key)) |
| 116 | + |
| 117 | + if not self.__cluster_type.supports(configuration): |
| 118 | + raise KeyError('Invalid cluster type, expected: {}'.format(self.__cluster_type)) |
70 | 119 |
|
71 |
| - cluster = self.__clusters[key] = _make_rb_cluster(**configuration) |
| 120 | + cluster = self.__clusters[key] = self.__cluster_type.factory(**configuration) |
72 | 121 |
|
73 | 122 | return cluster
|
74 | 123 |
|
75 | 124 |
|
| 125 | +# TODO(epurkhiser): When migration of all rb cluster to true redis clusters has |
| 126 | +# completed, remove the rb ``clusters`` module variable and rename |
| 127 | +# redis_clusters to clusters. |
76 | 128 | clusters = ClusterManager(options.default_manager)
|
| 129 | +redis_clusters = ClusterManager(options.default_manager, _RedisCluster) |
77 | 130 |
|
78 | 131 |
|
79 | 132 | def get_cluster_from_options(setting, options, cluster_manager=clusters):
|
|
0 commit comments