Skip to content

Commit 653d60d

Browse files
committed
Initial commit.
0 parents  commit 653d60d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+4415
-0
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.idea
2+
*.pyc
3+
rd_service/settings.py

rd_service/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

rd_service/cli.py

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import argparse
2+
import logging
3+
import urlparse
4+
import redis
5+
import time
6+
import settings
7+
import data
8+
9+
10+
def start_workers(data_manager):
11+
try:
12+
data_manager.start_workers(settings.WORKERS_COUNT, settings.CONNECTION_STRING,
13+
settings.MAX_CONNECTIONS)
14+
logging.info("Workers started.")
15+
16+
while True:
17+
try:
18+
data_manager.refresh_queries()
19+
except Exception:
20+
logging.error("Something went wrong with refreshing queries...");
21+
time.sleep(60)
22+
except KeyboardInterrupt:
23+
logging.warning("Exiting; waiting for threads")
24+
data_manager.stop_workers()
25+
26+
27+
if __name__ == '__main__':
28+
channel = logging.StreamHandler()
29+
logging.getLogger().addHandler(channel)
30+
logging.getLogger().setLevel("DEBUG")
31+
32+
parser = argparse.ArgumentParser()
33+
parser.add_argument("command")
34+
args = parser.parse_args()
35+
36+
url = urlparse.urlparse(settings.REDIS_URL)
37+
redis_connection = redis.StrictRedis(host=url.hostname, port=url.port, db=0, password=url.password)
38+
data_manager = data.Manager(redis_connection, settings.INTERNAL_DB_CONNECTION_STRING, settings.MAX_CONNECTIONS)
39+
40+
if args.command == "worker":
41+
start_workers(data_manager)
42+
else:
43+
print "Unknown command"
44+

rd_service/data/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from manager import Manager
2+
from worker import Job
3+
import models
4+
import utils

rd_service/data/manager.py

+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import collections
2+
from contextlib import contextmanager
3+
import json
4+
import logging
5+
import psycopg2
6+
import psycopg2.pool
7+
import qr
8+
import redis
9+
import time
10+
import query_runner
11+
import worker
12+
from utils import gen_query_hash
13+
14+
15+
class QueryResult(collections.namedtuple('QueryData', 'id query data runtime retrieved_at query_hash')):
16+
def to_dict(self, parse_data=False):
17+
d = self._asdict()
18+
19+
if parse_data and d['data']:
20+
d['data'] = json.loads(d['data'])
21+
22+
return d
23+
24+
25+
class Manager(object):
26+
def __init__(self, redis_connection, db_connection_string, db_max_connections):
27+
self.redis_connection = redis_connection
28+
self.workers = []
29+
self.db_connection_pool = psycopg2.pool.ThreadedConnectionPool(1, db_max_connections,
30+
db_connection_string)
31+
self.queue = qr.PriorityQueue("jobs")
32+
self.max_retries = 5
33+
34+
# TODO: Use our Django Models
35+
def get_query_result_by_id(self, query_result_id):
36+
with self.db_transaction() as cursor:
37+
sql = "SELECT id, query, data, runtime, retrieved_at, query_hash FROM query_results " \
38+
"WHERE id=%s LIMIT 1"
39+
cursor.execute(sql, (query_result_id,))
40+
query_result = cursor.fetchone()
41+
42+
if query_result:
43+
query_result = QueryResult(*query_result)
44+
45+
return query_result
46+
47+
def get_query_result(self, query, ttl=0):
48+
query_hash = gen_query_hash(query)
49+
50+
with self.db_transaction() as cursor:
51+
sql = "SELECT id, query, data, runtime, retrieved_at, query_hash FROM query_results " \
52+
"WHERE query_hash=%s " \
53+
"AND retrieved_at < now() at time zone 'utc' - interval '%s second'" \
54+
"ORDER BY retrieved_at DESC LIMIT 1"
55+
cursor.execute(sql, (query_hash, psycopg2.extensions.AsIs(ttl)))
56+
query_result = cursor.fetchone()
57+
58+
if query_result:
59+
query_result = QueryResult(*query_result)
60+
61+
return query_result
62+
63+
def add_job(self, query, priority):
64+
query_hash = gen_query_hash(query)
65+
logging.info("[Manager][%s] Inserting job with priority=%s", query_hash, priority)
66+
try_count = 0
67+
job = None
68+
69+
while try_count < self.max_retries:
70+
try_count += 1
71+
72+
pipe = self.redis_connection.pipeline()
73+
try:
74+
pipe.watch('query_hash_job:%s' % query_hash)
75+
job_id = pipe.get('query_hash_job:%s' % query_hash)
76+
if job_id:
77+
logging.info("[Manager][%s] Found existing job: %s", query_hash, job_id)
78+
job = worker.Job.load(self, job_id)
79+
else:
80+
job = worker.Job(self, query, priority)
81+
pipe.multi()
82+
job.save(pipe)
83+
logging.info("[Manager][%s] Created new job: %s", query_hash, job.id)
84+
self.queue.push(job.id, job.priority)
85+
break
86+
87+
except redis.WatchError:
88+
continue
89+
90+
if not job:
91+
logging.error("[Manager][%s] Failed adding job for query.", query_hash)
92+
93+
return job
94+
95+
def refresh_queries(self):
96+
sql = """SELECT queries.query, queries.ttl, retrieved_at
97+
FROM (SELECT query, min(ttl) as ttl FROM queries WHERE ttl > 0 GROUP by query) queries
98+
JOIN (SELECT query, max(retrieved_at) as retrieved_at
99+
FROM query_results
100+
GROUP BY query) query_results on query_results.query=queries.query
101+
WHERE queries.ttl > 0
102+
AND query_results.retrieved_at + ttl * interval '1 second' < now() at time zone 'utc';"""
103+
104+
queries = self.run_query(sql)
105+
for query, ttl, retrieved_at in queries:
106+
self.add_job(query, worker.Job.LOW_PRIORITY)
107+
108+
def store_query_result(self, query, data, run_time, retrieved_at):
109+
query_result_id = None
110+
query_hash = gen_query_hash(query)
111+
sql = "INSERT INTO query_results (query_hash, query, data, runtime, retrieved_at) " \
112+
"VALUES (%s, %s, %s, %s, %s) RETURNING id"
113+
with self.db_transaction() as cursor:
114+
cursor.execute(sql, (query_hash, query, data, run_time, retrieved_at))
115+
if cursor.rowcount == 1:
116+
query_result_id = cursor.fetchone()[0]
117+
logging.info("[Manager][%s] Inserted query data; id=%s", query_hash, query_result_id)
118+
119+
sql = "UPDATE queries SET latest_query_data_id=%s WHERE query_hash=%s"
120+
cursor.execute(sql, (query_result_id, query_hash))
121+
122+
logging.info("[Manager][%s] Updated %s queries.", query_hash, cursor.rowcount)
123+
else:
124+
logging.error("[Manager][%s] Failed inserting query data.", query_hash)
125+
return query_result_id
126+
127+
def run_query(self, *args):
128+
sql = args[0]
129+
logging.debug("running query: %s %s", sql, args[1:])
130+
131+
with self.db_transaction() as cursor:
132+
cursor.execute(sql, args[1:])
133+
if cursor.description:
134+
data = list(cursor)
135+
else:
136+
data = cursor.rowcount
137+
138+
return data
139+
140+
def start_workers(self, workers_count, connection_string, max_connections):
141+
if self.workers:
142+
return self.workers
143+
144+
# TODO: who closes the connection pool?
145+
pg_connection_pool = psycopg2.pool.ThreadedConnectionPool(1, max_connections, connection_string)
146+
runner = query_runner.redshift(pg_connection_pool)
147+
148+
self.workers = [worker.Worker(self, runner) for i in range(workers_count)]
149+
for w in self.workers:
150+
w.start()
151+
152+
return self.workers
153+
154+
def stop_workers(self):
155+
for w in self.workers:
156+
w.continue_working = False
157+
w.join()
158+
159+
@contextmanager
160+
def db_transaction(self):
161+
connection = self.db_connection_pool.getconn()
162+
cursor = connection.cursor()
163+
try:
164+
yield cursor
165+
except:
166+
connection.rollback()
167+
raise
168+
else:
169+
connection.commit()
170+
finally:
171+
self.db_connection_pool.putconn(connection)

rd_service/data/models.py

+137
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import json
2+
from django.db import models
3+
from django.template.defaultfilters import slugify
4+
import utils
5+
6+
7+
class QueryResult(models.Model):
8+
id = models.AutoField(primary_key=True)
9+
query_hash = models.CharField(max_length=32)
10+
query = models.TextField()
11+
data = models.TextField()
12+
runtime = models.FloatField()
13+
retrieved_at = models.DateTimeField()
14+
15+
class Meta:
16+
app_label = 'redash'
17+
db_table = 'query_results'
18+
19+
def to_dict(self):
20+
return {
21+
'id': self.id,
22+
'query_hash': self.query_hash,
23+
'query': self.query,
24+
'data': json.loads(self.data),
25+
'runtime': self.runtime,
26+
'retrieved_at': self.retrieved_at
27+
}
28+
29+
def __unicode__(self):
30+
return u"%d | %s | %s" % (self.id, self.query_hash, self.retrieved_at)
31+
32+
33+
class Query(models.Model):
34+
id = models.AutoField(primary_key=True)
35+
latest_query_data = models.ForeignKey(QueryResult)
36+
name = models.CharField(max_length=255)
37+
description = models.CharField(max_length=4096)
38+
query = models.TextField()
39+
query_hash = models.CharField(max_length=32)
40+
ttl = models.IntegerField()
41+
user = models.CharField(max_length=360)
42+
created_at = models.DateTimeField(auto_now_add=True)
43+
44+
class Meta:
45+
app_label = 'redash'
46+
db_table = 'queries'
47+
48+
def to_dict(self, with_result=True):
49+
d = {
50+
'id': self.id,
51+
'latest_query_data_id': self.latest_query_data_id,
52+
'name': self.name,
53+
'description': self.description,
54+
'query': self.query,
55+
'query_hash': self.query_hash,
56+
'ttl': self.ttl,
57+
'user': self.user,
58+
'created_at': self.created_at,
59+
}
60+
61+
if with_result and self.latest_query_data_id:
62+
d['latest_query_data'] = self.latest_query_data.to_dict()
63+
64+
return d
65+
66+
def save(self, *args, **kwargs):
67+
self.query_hash = utils.gen_query_hash(self.query)
68+
super(Query, self).save(*args, **kwargs)
69+
70+
def __unicode__(self):
71+
return unicode(self.id)
72+
73+
74+
class Dashboard(models.Model):
75+
id = models.AutoField(primary_key=True)
76+
slug = models.CharField(max_length=140)
77+
name = models.CharField(max_length=100)
78+
user = models.CharField(max_length=360)
79+
layout = models.TextField()
80+
is_archived = models.BooleanField(default=False)
81+
82+
class Meta:
83+
app_label = 'redash'
84+
db_table = 'dashboards'
85+
86+
def to_dict(self, with_widgets=False):
87+
layout = json.loads(self.layout)
88+
89+
if with_widgets:
90+
widgets = {w.id: w.to_dict() for w in self.widgets.all()}
91+
widgets_layout = map(lambda row: map(lambda widget_id: widgets.get(widget_id, None), row), layout)
92+
else:
93+
widgets_layout = None
94+
95+
return {
96+
'id': self.id,
97+
'slug': self.slug,
98+
'name': self.name,
99+
'user': self.user,
100+
'layout': layout,
101+
'widgets': widgets_layout
102+
}
103+
104+
def save(self, *args, **kwargs):
105+
# TODO: make sure slug is unique
106+
if not self.slug:
107+
self.slug = slugify(self.name)
108+
super(Dashboard, self).save(*args, **kwargs)
109+
110+
def __unicode__(self):
111+
return u"%s=%s" % (self.id, self.name)
112+
113+
114+
class Widget(models.Model):
115+
id = models.AutoField(primary_key=True)
116+
query = models.ForeignKey(Query)
117+
type = models.CharField(max_length=100)
118+
width = models.IntegerField()
119+
options = models.TextField()
120+
dashboard = models.ForeignKey(Dashboard, related_name='widgets')
121+
122+
class Meta:
123+
app_label = 'redash'
124+
db_table = 'widgets'
125+
126+
def to_dict(self):
127+
return {
128+
'id': self.id,
129+
'query': self.query.to_dict(),
130+
'type': self.type,
131+
'width': self.width,
132+
'options': json.loads(self.options),
133+
'dashboard_id': self.dashboard_id
134+
}
135+
136+
def __unicode__(self):
137+
return u"%s=>%s" % (self.id, self.dashboard_id)

0 commit comments

Comments
 (0)