From 467482f942e6820870a70c32551fe413494204b6 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 31 Aug 2016 16:01:34 -0700 Subject: [PATCH 1/6] Add simulated analytics DB for BQ DML samples. This generates a random MySQL database (tables defined in create_sample_db.sql) with random user actions. The reason for putting these into a SQL database instead of directly into BigQuery is that it will be used to show export form a SQL database into BigQuery. Hopefully the limits on query sizes for BigQuery are large enough that this works for larger databases. Change-Id: I446c3af72dab60d9ed79a2c814a68f05801ae17b --- bigquery/dml/.gitignore | 2 + bigquery/dml/create_sample_db.sql | 109 +++++++++++++ bigquery/dml/simulate_actions.py | 244 ++++++++++++++++++++++++++++++ 3 files changed, 355 insertions(+) create mode 100644 bigquery/dml/.gitignore create mode 100644 bigquery/dml/create_sample_db.sql create mode 100755 bigquery/dml/simulate_actions.py diff --git a/bigquery/dml/.gitignore b/bigquery/dml/.gitignore new file mode 100644 index 00000000000..ac51be62af4 --- /dev/null +++ b/bigquery/dml/.gitignore @@ -0,0 +1,2 @@ +# Includes password. +config.yaml diff --git a/bigquery/dml/create_sample_db.sql b/bigquery/dml/create_sample_db.sql new file mode 100644 index 00000000000..0d160bf7764 --- /dev/null +++ b/bigquery/dml/create_sample_db.sql @@ -0,0 +1,109 @@ +-- MySQL dump 10.13 Distrib 5.7.14, for osx10.11 (x86_64) +-- +-- Host: 127.0.0.1 Database: sample_db +-- ------------------------------------------------------ +-- Server version 5.7.14 + +/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; +/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; +/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; +/*!40101 SET NAMES utf8 */; +/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; +/*!40103 SET TIME_ZONE='+00:00' */; +/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; +/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; +/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; +/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; + +-- +-- Table structure for table `UserActions` +-- + +DROP TABLE IF EXISTS `UserActions`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `UserActions` ( + `ActionID` int(11) NOT NULL AUTO_INCREMENT, + `SessionID` varchar(36) DEFAULT NULL, + `UserID` int(11) NOT NULL, + `ActionType` varchar(64) NOT NULL DEFAULT '', + `ActionTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, + `Message` longtext, + PRIMARY KEY (`ActionID`), + KEY `SessionID` (`SessionID`), + KEY `UserID` (`UserID`), + CONSTRAINT `useractions_ibfk_2` FOREIGN KEY (`SessionID`) REFERENCES `UserSessions` (`SessionID`), + CONSTRAINT `useractions_ibfk_3` FOREIGN KEY (`UserID`) REFERENCES `Users` (`UserID`) +) ENGINE=InnoDB AUTO_INCREMENT=667621 DEFAULT CHARSET=utf8; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Dumping data for table `UserActions` +-- + +LOCK TABLES `UserActions` WRITE; +/*!40000 ALTER TABLE `UserActions` DISABLE KEYS */; +/*!40000 ALTER TABLE `UserActions` ENABLE KEYS */; +UNLOCK TABLES; + +-- +-- Table structure for table `UserSessions` +-- + +DROP TABLE IF EXISTS `UserSessions`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `UserSessions` ( + `SessionID` varchar(36) NOT NULL DEFAULT '', + `LoginTime` timestamp NULL DEFAULT NULL, + `LogoutTime` timestamp NULL DEFAULT NULL, + `IPAddress` varchar(40) DEFAULT '', + `UserID` int(11) DEFAULT NULL, + PRIMARY KEY (`SessionID`), + KEY `UserID` (`UserID`), + CONSTRAINT `usersessions_ibfk_1` FOREIGN KEY (`UserID`) REFERENCES `Users` (`UserID`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Dumping data for table `UserSessions` +-- + +LOCK TABLES `UserSessions` WRITE; +/*!40000 ALTER TABLE `UserSessions` DISABLE KEYS */; +/*!40000 ALTER TABLE `UserSessions` ENABLE KEYS */; +UNLOCK TABLES; + +-- +-- Table structure for table `Users` +-- + +DROP TABLE IF EXISTS `Users`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `Users` ( + `UserID` int(11) NOT NULL, + `DateJoined` timestamp NULL DEFAULT NULL, + PRIMARY KEY (`UserID`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Dumping data for table `Users` +-- + +LOCK TABLES `Users` WRITE; +/*!40000 ALTER TABLE `Users` DISABLE KEYS */; +/*!40000 ALTER TABLE `Users` ENABLE KEYS */; +UNLOCK TABLES; +/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; + +/*!40101 SET SQL_MODE=@OLD_SQL_MODE */; +/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; +/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; +/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; +/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */; +/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; +/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; + +-- Dump completed on 2016-09-02 12:39:19 diff --git a/bigquery/dml/simulate_actions.py b/bigquery/dml/simulate_actions.py new file mode 100755 index 00000000000..5ad0e3fd23d --- /dev/null +++ b/bigquery/dml/simulate_actions.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python + +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Command-line tool to simulate user actions and write to SQL database. +""" + +from __future__ import division + +import argparse +import collections +import datetime +import random +import uuid + +import pymysql + + +SECONDS_IN_DAY = 24 * 60 * 60 +SECONDS_IN_2016 = 366 * SECONDS_IN_DAY + +# Unix timestamp for the beginning of 2016. +# http://stackoverflow.com/a/19801806/101923 +TIMESTAMP_2016 = ( + datetime.datetime(2016, 1, 1, 0, 0, 0) - + datetime.datetime.fromtimestamp(0)).total_seconds() + + +User = collections.namedtuple('User', ['id', 'date_joined']) + +UserSession = collections.namedtuple( + 'UserSession', + ['id', 'login', 'logout', 'user_id', 'ip_address']) + +UserAction = collections.namedtuple( + 'UserAction', + ['session_id', 'user_id', 'type', 'time', 'message']) + + +def generate_users(num_users): + users = [] + + for userid in range(num_users): + # Add more users at the end of 2016 than the beginning. + # https://en.wikipedia.org/wiki/Beta_distribution + year_portion = random.betavariate(3, 1) + date_joined = datetime.datetime.fromtimestamp( + TIMESTAMP_2016 + SECONDS_IN_2016 * year_portion) + users.append(User(userid, date_joined)) + + return users + + +def insert_users(connection, users): + """Inserts rows into the Users table.""" + + with connection.cursor() as cursor: + cursor.execute('DELETE FROM `UserActions`') + cursor.execute('DELETE FROM `UserSessions`') + cursor.execute('DELETE FROM `Users`') + + connection.commit() + + with connection.cursor() as cursor: + cursor.executemany( + 'INSERT INTO `Users` (`UserID`, `DateJoined`) VALUES (%s,%s)', + [(user.id, user.date_joined.isoformat(' ')) for user in users]) + + connection.commit() + + +def random_ip(): + """Choose a random example IP address. + + Examples are chosen from the test networks described in + https://tools.ietf.org/html/rfc5737 + """ + network = random.choice([ + '192.0.2', # RFC-5737 TEST-NET-1 + '198.51.100', # RFC-5737 TEST-NET-2 + '203.0.113', # RFC-5737 TEST-NET-3 + ]) + ip_address = '{}.{}'.format(network, random.randrange(256)) + return ip_address + + +def simulate_user_session(connection, user, previous_session=None): + """Simulates a single session (login to logout) of a user's history.""" + login_time = user.date_joined + + if previous_session is not None: + login_time = ( + previous_session.logout + + datetime.timedelta( + days=1, seconds=random.randrange(SECONDS_IN_DAY))) + + session_id = str(uuid.uuid4()) + previous_action_time = login_time + total_actions = random.randrange(10) + 1 + actions = [] + + for _ in range(total_actions): + action_type=random.choice(['CLICKED', 'PURCHASED']) + action_time=(previous_action_time + + datetime.timedelta(seconds=random.randrange(59) + 1)) + message='breed={}'.format( + random.choice([ + 'Albera', + 'Angus', + 'Beefalo', + 'Droughtmaster', + 'Longhorn', + 'Guernsey', + 'Highland', + 'Holstein', + 'Jersey', + 'Normande', + 'Shetland', + 'Wagyu', + ])) + action = UserAction( + session_id=session_id, + user_id=user.id, + type=action_type, + time=action_time, + message=message) + + previous_action_time = action_time + actions.append(action) + + logout_time = ( + previous_action_time + + datetime.timedelta(seconds=(1 + random.randrange(59)))) + + return ( + UserSession( + session_id, + login_time, + logout_time, + user.id, + random_ip()), + actions) + + +def simulate_user_history(connection, user): + """Simulates the entire history of activity for a single user.""" + total_sessions = random.randrange(10) + sessions = [] + actions = [] + previous_session = None + + for _ in range(total_sessions): + session, user_actions = simulate_user_session( + connection, user, previous_session) + sessions.append(session) + actions.extend(user_actions) + previous_session = session + + with connection.cursor() as cursor: + cursor.executemany( + 'INSERT INTO `UserSessions` ' + '(`SessionID`, ' + '`LoginTime`, ' + '`LogoutTime`, ' + '`UserID`, ' + '`IPAddress`) ' + 'VALUES (%s,%s,%s,%s,%s)', + [( + session.id, + session.login.isoformat(' '), + session.logout.isoformat(' '), + session.user_id, + session.ip_address, + ) for session in sessions]) + cursor.executemany( + 'INSERT INTO `UserActions` ' + '(`SessionID`, ' + '`UserID`, ' + '`ActionType`, ' + '`ActionTime`, ' + '`Message`) ' + 'VALUES (%s,%s,%s,%s,%s)', + [( + action.session_id, + action.user_id, + action.type, + action.time.isoformat(' '), + action.message, + ) for action in actions]) + + connection.commit() + + +def run_simulation(connection, users): + """Simulates app activity for all users.""" + + for n, user in enumerate(users): + + if n % 100 == 0 and n != 0: + print('Simulated data for {} users'.format(n)) + + simulate_user_history(connection, user) + + print('COMPLETE: Simulated data for {} users'.format(len(users))) + + +def main(total_users, host, user, password, db_name): + connection = pymysql.connect( + host=host, user=user, password=password, db=db_name) + + try: + users = generate_users(total_users) + insert_users(connection, users) + run_simulation(connection, users) + finally: + connection.close() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument( + 'total_users', help='How many simulated users to create.', type=int) + parser.add_argument('host', help='Host of the database to write to.') + parser.add_argument('user', help='User to connect to the database.') + parser.add_argument('password', help='Password for the database user.') + parser.add_argument('db', help='Name of the database to write to.') + + args = parser.parse_args() + + main(args.total_users, args.host, args.user, args.password, args.db) From c2a86feb7b6d41499c2d8090138018d193f7b5c1 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 13 Sep 2016 11:20:11 -0700 Subject: [PATCH 2/6] Add tests to BigQuery DML samples. Convert to SQLAlchemy. This tests the SQL code to create the tables, as well as the Python code that creates the rows. Uses SQLAlchemy to abstract away differences between database engines. Change-Id: Id9e70eef56f5e203921b6c1f21708631f3a767f7 --- bigquery/dml/.gitignore | 3 +- bigquery/dml/README.md | 8 + bigquery/dml/create_sample_db.sql | 109 ---------- .../{simulate_actions.py => populate_db.py} | 195 +++++++++--------- bigquery/dml/populate_db_test.py | 36 ++++ bigquery/dml/requirements.txt | 5 + 6 files changed, 142 insertions(+), 214 deletions(-) create mode 100644 bigquery/dml/README.md delete mode 100644 bigquery/dml/create_sample_db.sql rename bigquery/dml/{simulate_actions.py => populate_db.py} (55%) create mode 100644 bigquery/dml/populate_db_test.py create mode 100644 bigquery/dml/requirements.txt diff --git a/bigquery/dml/.gitignore b/bigquery/dml/.gitignore index ac51be62af4..f4058b86817 100644 --- a/bigquery/dml/.gitignore +++ b/bigquery/dml/.gitignore @@ -1,2 +1 @@ -# Includes password. -config.yaml +sample_db_export.sql diff --git a/bigquery/dml/README.md b/bigquery/dml/README.md new file mode 100644 index 00000000000..401ae455da6 --- /dev/null +++ b/bigquery/dml/README.md @@ -0,0 +1,8 @@ +# BigQuery DML Samples + +To create a test database, run the `populate_db.py` script. + +``` +python populate_db.py 100 localhost root 'mysql-password' sample_db +``` + diff --git a/bigquery/dml/create_sample_db.sql b/bigquery/dml/create_sample_db.sql deleted file mode 100644 index 0d160bf7764..00000000000 --- a/bigquery/dml/create_sample_db.sql +++ /dev/null @@ -1,109 +0,0 @@ --- MySQL dump 10.13 Distrib 5.7.14, for osx10.11 (x86_64) --- --- Host: 127.0.0.1 Database: sample_db --- ------------------------------------------------------ --- Server version 5.7.14 - -/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; -/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; -/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; -/*!40101 SET NAMES utf8 */; -/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; -/*!40103 SET TIME_ZONE='+00:00' */; -/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; -/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; -/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; -/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; - --- --- Table structure for table `UserActions` --- - -DROP TABLE IF EXISTS `UserActions`; -/*!40101 SET @saved_cs_client = @@character_set_client */; -/*!40101 SET character_set_client = utf8 */; -CREATE TABLE `UserActions` ( - `ActionID` int(11) NOT NULL AUTO_INCREMENT, - `SessionID` varchar(36) DEFAULT NULL, - `UserID` int(11) NOT NULL, - `ActionType` varchar(64) NOT NULL DEFAULT '', - `ActionTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP, - `Message` longtext, - PRIMARY KEY (`ActionID`), - KEY `SessionID` (`SessionID`), - KEY `UserID` (`UserID`), - CONSTRAINT `useractions_ibfk_2` FOREIGN KEY (`SessionID`) REFERENCES `UserSessions` (`SessionID`), - CONSTRAINT `useractions_ibfk_3` FOREIGN KEY (`UserID`) REFERENCES `Users` (`UserID`) -) ENGINE=InnoDB AUTO_INCREMENT=667621 DEFAULT CHARSET=utf8; -/*!40101 SET character_set_client = @saved_cs_client */; - --- --- Dumping data for table `UserActions` --- - -LOCK TABLES `UserActions` WRITE; -/*!40000 ALTER TABLE `UserActions` DISABLE KEYS */; -/*!40000 ALTER TABLE `UserActions` ENABLE KEYS */; -UNLOCK TABLES; - --- --- Table structure for table `UserSessions` --- - -DROP TABLE IF EXISTS `UserSessions`; -/*!40101 SET @saved_cs_client = @@character_set_client */; -/*!40101 SET character_set_client = utf8 */; -CREATE TABLE `UserSessions` ( - `SessionID` varchar(36) NOT NULL DEFAULT '', - `LoginTime` timestamp NULL DEFAULT NULL, - `LogoutTime` timestamp NULL DEFAULT NULL, - `IPAddress` varchar(40) DEFAULT '', - `UserID` int(11) DEFAULT NULL, - PRIMARY KEY (`SessionID`), - KEY `UserID` (`UserID`), - CONSTRAINT `usersessions_ibfk_1` FOREIGN KEY (`UserID`) REFERENCES `Users` (`UserID`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; -/*!40101 SET character_set_client = @saved_cs_client */; - --- --- Dumping data for table `UserSessions` --- - -LOCK TABLES `UserSessions` WRITE; -/*!40000 ALTER TABLE `UserSessions` DISABLE KEYS */; -/*!40000 ALTER TABLE `UserSessions` ENABLE KEYS */; -UNLOCK TABLES; - --- --- Table structure for table `Users` --- - -DROP TABLE IF EXISTS `Users`; -/*!40101 SET @saved_cs_client = @@character_set_client */; -/*!40101 SET character_set_client = utf8 */; -CREATE TABLE `Users` ( - `UserID` int(11) NOT NULL, - `DateJoined` timestamp NULL DEFAULT NULL, - PRIMARY KEY (`UserID`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; -/*!40101 SET character_set_client = @saved_cs_client */; - --- --- Dumping data for table `Users` --- - -LOCK TABLES `Users` WRITE; -/*!40000 ALTER TABLE `Users` DISABLE KEYS */; -/*!40000 ALTER TABLE `Users` ENABLE KEYS */; -UNLOCK TABLES; -/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; - -/*!40101 SET SQL_MODE=@OLD_SQL_MODE */; -/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; -/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; -/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; -/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */; -/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; -/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; - --- Dump completed on 2016-09-02 12:39:19 diff --git a/bigquery/dml/simulate_actions.py b/bigquery/dml/populate_db.py similarity index 55% rename from bigquery/dml/simulate_actions.py rename to bigquery/dml/populate_db.py index 5ad0e3fd23d..f0eac12869e 100755 --- a/bigquery/dml/simulate_actions.py +++ b/bigquery/dml/populate_db.py @@ -20,12 +20,14 @@ from __future__ import division import argparse -import collections import datetime import random import uuid -import pymysql +from six.moves.urllib import parse +import sqlalchemy +from sqlalchemy.ext import declarative +import sqlalchemy.orm SECONDS_IN_DAY = 24 * 60 * 60 @@ -38,49 +40,58 @@ datetime.datetime.fromtimestamp(0)).total_seconds() -User = collections.namedtuple('User', ['id', 'date_joined']) +Base = declarative.declarative_base() -UserSession = collections.namedtuple( - 'UserSession', - ['id', 'login', 'logout', 'user_id', 'ip_address']) -UserAction = collections.namedtuple( - 'UserAction', - ['session_id', 'user_id', 'type', 'time', 'message']) +class User(Base): + __tablename__ = 'Users' + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + date_joined = sqlalchemy.Column(sqlalchemy.DateTime) -def generate_users(num_users): + +class UserSession(Base): + __tablename__ = 'UserSessions' + + id = sqlalchemy.Column(sqlalchemy.String(length=36), primary_key=True) + user_id = sqlalchemy.Column( + sqlalchemy.Integer, sqlalchemy.ForeignKey('Users.id')) + login_time = sqlalchemy.Column(sqlalchemy.DateTime) + logout_time = sqlalchemy.Column(sqlalchemy.DateTime) + ip_address = sqlalchemy.Column(sqlalchemy.String(length=40)) + + +class UserAction(Base): + __tablename__ = 'UserActions' + + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) + session_id = sqlalchemy.Column( + sqlalchemy.String(length=36), + sqlalchemy.ForeignKey('UserSessions.id')) + user_id = sqlalchemy.Column( + sqlalchemy.Integer, sqlalchemy.ForeignKey('Users.id')) + action_type = sqlalchemy.Column(sqlalchemy.String(length=64)) + action_time = sqlalchemy.Column(sqlalchemy.DateTime) + message = sqlalchemy.Column(sqlalchemy.Text) + + +def generate_users(session, num_users): users = [] - for userid in range(num_users): + for userid in range(1, num_users + 1): # Add more users at the end of 2016 than the beginning. # https://en.wikipedia.org/wiki/Beta_distribution year_portion = random.betavariate(3, 1) date_joined = datetime.datetime.fromtimestamp( TIMESTAMP_2016 + SECONDS_IN_2016 * year_portion) - users.append(User(userid, date_joined)) + user = User(id=userid, date_joined=date_joined) + users.append(user) + session.add(user) + session.commit() return users -def insert_users(connection, users): - """Inserts rows into the Users table.""" - - with connection.cursor() as cursor: - cursor.execute('DELETE FROM `UserActions`') - cursor.execute('DELETE FROM `UserSessions`') - cursor.execute('DELETE FROM `Users`') - - connection.commit() - - with connection.cursor() as cursor: - cursor.executemany( - 'INSERT INTO `Users` (`UserID`, `DateJoined`) VALUES (%s,%s)', - [(user.id, user.date_joined.isoformat(' ')) for user in users]) - - connection.commit() - - def random_ip(): """Choose a random example IP address. @@ -96,26 +107,33 @@ def random_ip(): return ip_address -def simulate_user_session(connection, user, previous_session=None): +def simulate_user_session(session, user, previous_user_session=None): """Simulates a single session (login to logout) of a user's history.""" login_time = user.date_joined - if previous_session is not None: + if previous_user_session is not None: login_time = ( - previous_session.logout + + previous_user_session.logout_time + datetime.timedelta( days=1, seconds=random.randrange(SECONDS_IN_DAY))) session_id = str(uuid.uuid4()) + user_session = UserSession( + id=session_id, + user_id=user.id, + login_time=login_time, + ip_address=random_ip()) + session.add(user_session) + session.commit() previous_action_time = login_time total_actions = random.randrange(10) + 1 - actions = [] for _ in range(total_actions): - action_type=random.choice(['CLICKED', 'PURCHASED']) - action_time=(previous_action_time + + action_type = random.choice(['CLICKED', 'PURCHASED']) + action_time = ( + previous_action_time + datetime.timedelta(seconds=random.randrange(59) + 1)) - message='breed={}'.format( + message = 'breed={}'.format( random.choice([ 'Albera', 'Angus', @@ -133,77 +151,32 @@ def simulate_user_session(connection, user, previous_session=None): action = UserAction( session_id=session_id, user_id=user.id, - type=action_type, - time=action_time, + action_type=action_type, + action_time=action_time, message=message) previous_action_time = action_time - actions.append(action) + session.add(action) - logout_time = ( + user_session.logout_time = ( previous_action_time + datetime.timedelta(seconds=(1 + random.randrange(59)))) + session.commit() + return user_session - return ( - UserSession( - session_id, - login_time, - logout_time, - user.id, - random_ip()), - actions) - -def simulate_user_history(connection, user): +def simulate_user_history(session, user): """Simulates the entire history of activity for a single user.""" total_sessions = random.randrange(10) - sessions = [] - actions = [] - previous_session = None + previous_user_session = None for _ in range(total_sessions): - session, user_actions = simulate_user_session( - connection, user, previous_session) - sessions.append(session) - actions.extend(user_actions) - previous_session = session - - with connection.cursor() as cursor: - cursor.executemany( - 'INSERT INTO `UserSessions` ' - '(`SessionID`, ' - '`LoginTime`, ' - '`LogoutTime`, ' - '`UserID`, ' - '`IPAddress`) ' - 'VALUES (%s,%s,%s,%s,%s)', - [( - session.id, - session.login.isoformat(' '), - session.logout.isoformat(' '), - session.user_id, - session.ip_address, - ) for session in sessions]) - cursor.executemany( - 'INSERT INTO `UserActions` ' - '(`SessionID`, ' - '`UserID`, ' - '`ActionType`, ' - '`ActionTime`, ' - '`Message`) ' - 'VALUES (%s,%s,%s,%s,%s)', - [( - action.session_id, - action.user_id, - action.type, - action.time.isoformat(' '), - action.message, - ) for action in actions]) - - connection.commit() - - -def run_simulation(connection, users): + user_session = simulate_user_session( + session, user, previous_user_session) + previous_user_session = user_session + + +def run_simulation(session, users): """Simulates app activity for all users.""" for n, user in enumerate(users): @@ -211,21 +184,37 @@ def run_simulation(connection, users): if n % 100 == 0 and n != 0: print('Simulated data for {} users'.format(n)) - simulate_user_history(connection, user) + simulate_user_history(session, user) print('COMPLETE: Simulated data for {} users'.format(len(users))) +def populate_db(session, total_users=3): + """Populate database with total_users simulated users and their actions.""" + users = generate_users(session, total_users) + run_simulation(session, users) + + +def create_session(engine): + Base.metadata.drop_all(engine) + Base.metadata.create_all(engine) + Session = sqlalchemy.orm.sessionmaker(bind=engine) + return Session() + + def main(total_users, host, user, password, db_name): - connection = pymysql.connect( - host=host, user=user, password=password, db=db_name) + engine = sqlalchemy.create_engine( + 'mysql+pymysql://{user}:{password}@{host}/{db_name}'.format( + user=user, + password=parse.quote_plus(password), + host=host, + db_name=db_name)) + session = create_session(engine) try: - users = generate_users(total_users) - insert_users(connection, users) - run_simulation(connection, users) + populate_db(session, total_users) finally: - connection.close() + session.close() if __name__ == '__main__': diff --git a/bigquery/dml/populate_db_test.py b/bigquery/dml/populate_db_test.py new file mode 100644 index 00000000000..7dbef2d15b3 --- /dev/null +++ b/bigquery/dml/populate_db_test.py @@ -0,0 +1,36 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy + +from populate_db import create_session, populate_db + + +def test_populate_db_populates_users(): + engine = sqlalchemy.create_engine('sqlite://') + session = create_session(engine) + + try: + populate_db(session, total_users=100) + + connection = session.connection().connection + cursor = connection.cursor() + cursor.execute('SELECT COUNT(*) FROM Users') + assert cursor.fetchone()[0] == 100 + cursor.execute('SELECT COUNT(*) FROM UserSessions') + assert cursor.fetchone()[0] >= 100 + cursor.execute('SELECT COUNT(*) FROM UserActions') + assert cursor.fetchone()[0] >= 100 + finally: + session.close() diff --git a/bigquery/dml/requirements.txt b/bigquery/dml/requirements.txt new file mode 100644 index 00000000000..8c09e7e069b --- /dev/null +++ b/bigquery/dml/requirements.txt @@ -0,0 +1,5 @@ +flake8==3.0.4 +gcloud==0.18.1 +PyMySQL==0.7.7 +six==1.10.0 +SQLAlchemy==1.0.15 From c677b352e027025b2b69a30d0705ed278a3585cb Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 16 Sep 2016 16:53:50 -0700 Subject: [PATCH 3/6] Add BigQuery DML sample for inserts. This sample reads a SQL file (for example: one that was output from mysqldump) and executes each line as a query. At least in my configuration of mysqldump, each insert statement was on a single line, so I was able to write data from MySQL to BigQuery with this sample. Change-Id: Id14b648b0ce6bac651e436d402f480c56d80bd37 --- bigquery/dml/insert_sql.py | 96 ++++++++++++++++++++++ bigquery/dml/insert_sql_test.py | 32 ++++++++ bigquery/dml/resources/insert_sql_test.sql | 6 ++ 3 files changed, 134 insertions(+) create mode 100644 bigquery/dml/insert_sql.py create mode 100644 bigquery/dml/insert_sql_test.py create mode 100644 bigquery/dml/resources/insert_sql_test.sql diff --git a/bigquery/dml/insert_sql.py b/bigquery/dml/insert_sql.py new file mode 100644 index 00000000000..492519368b1 --- /dev/null +++ b/bigquery/dml/insert_sql.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python + +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Sample to run line-separated SQL statements in Big Query from a file. + +This could be used to run INSERT DML statements from a mysqldump output such as + + mysqldump --user=root \ + --password='secret-password' \ + --host=127.0.0.1 \ + --no-create-info sample_db \ + --skip-add-locks > sample_db_export.sql + +To run, first create tables with the same names and columns as the sample +database. Then run this script. + + python insert_sql.py \ + --project=my-project \ + --default_dataset=my_db \ + sample_db_export.sql +""" + +from __future__ import print_function + +import argparse +import time + +from gcloud import bigquery +from gcloud import exceptions + + +def retry_query(query, times=3): + + for attempt in range(times): + + try: + query.run() + return + except exceptions.GCloudError as err: + + if attempt == times - 1: + print('Giving up') + raise + + print('Retrying, got error: {}'.format(err)) + time.sleep(1) + + +def insert_sql(sql_path, project=None, default_dataset=None): + client = bigquery.Client(project=project) + + with open(sql_path) as f: + for line in f: + line = line.strip() + + # Ignore blank lines and comments. + if line == '' or line.startswith('--') or line.startswith('/*'): + continue + + print('Running query: {}{}'.format( + line[:60], + '...' if len(line) > 60 else '')) + query = client.run_sync_query(line) + query.use_legacy_sql = False + + if default_dataset is not None: + query.default_dataset = client.dataset(default_dataset) + + retry_query(query) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('--project', help='Google Cloud project name') + parser.add_argument( + '--default-dataset', help='Default BigQuery dataset name') + parser.add_argument('sql_path', help='Path to SQL file') + + args = parser.parse_args() + + insert_sql(args.sql_path, args.project, args.default_dataset) diff --git a/bigquery/dml/insert_sql_test.py b/bigquery/dml/insert_sql_test.py new file mode 100644 index 00000000000..694c3c7f882 --- /dev/null +++ b/bigquery/dml/insert_sql_test.py @@ -0,0 +1,32 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os.path + +from insert_sql import insert_sql + + +def test_insert_sql(cloud_config, capsys): + sql_path = os.path.join( + os.path.dirname(__file__), + 'resources', + 'insert_sql_test.sql') + + insert_sql(sql_path, default_dataset='test_dataset') + + out, _ = capsys.readouterr() + + assert ( + 'INSERT INTO `test_table` (`Name`) VALUES (\'hello world\')' + in out) diff --git a/bigquery/dml/resources/insert_sql_test.sql b/bigquery/dml/resources/insert_sql_test.sql new file mode 100644 index 00000000000..42f5dabdacb --- /dev/null +++ b/bigquery/dml/resources/insert_sql_test.sql @@ -0,0 +1,6 @@ +-- This file is used to test ../insert_sql.py. +-- These are comments. +-- Each query to be executed should be on a single line. + +/* Another ignored line. */ +INSERT INTO `test_table` (`Name`) VALUES ('hello world') From 16f4835bde4310beacd6fb82e84b99560f211571 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 26 Sep 2016 10:59:07 -0700 Subject: [PATCH 4/6] Add region tags for DML sample. Also, adds a few explanatory comments for the docs. Change-Id: I623bf226839ab43f8da8297938223323a04e5838 --- bigquery/dml/insert_sql.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/bigquery/dml/insert_sql.py b/bigquery/dml/insert_sql.py index 492519368b1..318f7be162c 100644 --- a/bigquery/dml/insert_sql.py +++ b/bigquery/dml/insert_sql.py @@ -36,6 +36,7 @@ from __future__ import print_function import argparse +# [START insert_sql] import time from gcloud import bigquery @@ -43,6 +44,7 @@ def retry_query(query, times=3): + """Retry a query up to some number of times.""" for attempt in range(times): @@ -60,6 +62,8 @@ def retry_query(query, times=3): def insert_sql(sql_path, project=None, default_dataset=None): + """Run all the SQL statements in a SQL file.""" + client = bigquery.Client(project=project) with open(sql_path) as f: @@ -74,12 +78,19 @@ def insert_sql(sql_path, project=None, default_dataset=None): line[:60], '...' if len(line) > 60 else '')) query = client.run_sync_query(line) + + # Set use_legacy_sql to False to enable standard SQL syntax. + # This is required to use the Data Manipulation Language features. + # + # For more information about enabling standard SQL, see: + # https://cloud.google.com/bigquery/sql-reference/enabling-standard-sql query.use_legacy_sql = False if default_dataset is not None: query.default_dataset = client.dataset(default_dataset) retry_query(query) +# [END insert_sql] if __name__ == "__main__": From cc92c05e3e6e948d8ddfc0dfd9a24327fd32831a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 28 Sep 2016 10:43:39 -0700 Subject: [PATCH 5/6] Add docs link to BigQuery DML README. Change-Id: Ie0d12ac9aedfdd83d2f6f533ad30265f75126e4f --- bigquery/dml/README.md | 7 +++++++ scripts/resources/docs-links.json | 3 +++ 2 files changed, 10 insertions(+) diff --git a/bigquery/dml/README.md b/bigquery/dml/README.md index 401ae455da6..39b167df93e 100644 --- a/bigquery/dml/README.md +++ b/bigquery/dml/README.md @@ -1,5 +1,12 @@ # BigQuery DML Samples + +These samples are used on the following documentation page: + +> https://cloud.google.combigquery/docs/loading-data-sql-dml + + + To create a test database, run the `populate_db.py` script. ``` diff --git a/scripts/resources/docs-links.json b/scripts/resources/docs-links.json index 60ea2ec76bf..19a77f6abdc 100644 --- a/scripts/resources/docs-links.json +++ b/scripts/resources/docs-links.json @@ -361,6 +361,9 @@ "/bigquery/docs/data": [ "bigquery/api/sync_query.py" ], + "bigquery/docs/loading-data-sql-dml": [ + "bigquery/dml/insert_sql.py" + ], "/appengine/docs/python/memcache/examples": [ "appengine/memcache/snippets/snippets.py", "appengine/memcache/guestbook/main.py" From ef345a225996e394a6bc87297869b6180604bf87 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 28 Sep 2016 11:58:28 -0700 Subject: [PATCH 6/6] Simplify DML samples. Removes unnecessary UserActions table from populate_db.py. Removes retry logic and changes sample to only look for INSERT lines in insert_sql.py Change-Id: If8994c420cd95babf3c4673a3b87affbfca4f32a --- bigquery/dml/insert_sql.py | 53 +++++++---------------------- bigquery/dml/insert_sql_test.py | 2 +- bigquery/dml/populate_db.py | 57 ++------------------------------ bigquery/dml/populate_db_test.py | 8 ++--- 4 files changed, 19 insertions(+), 101 deletions(-) diff --git a/bigquery/dml/insert_sql.py b/bigquery/dml/insert_sql.py index 318f7be162c..b79699e9ccf 100644 --- a/bigquery/dml/insert_sql.py +++ b/bigquery/dml/insert_sql.py @@ -14,9 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Sample to run line-separated SQL statements in Big Query from a file. +"""Sample that runs a file containing INSERT SQL statements in Big Query. -This could be used to run INSERT DML statements from a mysqldump output such as +This could be used to run the INSERT statements in a mysqldump output such as mysqldump --user=root \ --password='secret-password' \ @@ -27,41 +27,16 @@ To run, first create tables with the same names and columns as the sample database. Then run this script. - python insert_sql.py \ - --project=my-project \ - --default_dataset=my_db \ - sample_db_export.sql + python insert_sql.py my-project my_dataset sample_db_export.sql """ -from __future__ import print_function - -import argparse # [START insert_sql] -import time +import argparse from gcloud import bigquery -from gcloud import exceptions - - -def retry_query(query, times=3): - """Retry a query up to some number of times.""" - - for attempt in range(times): - - try: - query.run() - return - except exceptions.GCloudError as err: - if attempt == times - 1: - print('Giving up') - raise - print('Retrying, got error: {}'.format(err)) - time.sleep(1) - - -def insert_sql(sql_path, project=None, default_dataset=None): +def insert_sql(project, default_dataset, sql_path): """Run all the SQL statements in a SQL file.""" client = bigquery.Client(project=project) @@ -70,8 +45,7 @@ def insert_sql(sql_path, project=None, default_dataset=None): for line in f: line = line.strip() - # Ignore blank lines and comments. - if line == '' or line.startswith('--') or line.startswith('/*'): + if not line.startswith('INSERT'): continue print('Running query: {}{}'.format( @@ -85,23 +59,20 @@ def insert_sql(sql_path, project=None, default_dataset=None): # For more information about enabling standard SQL, see: # https://cloud.google.com/bigquery/sql-reference/enabling-standard-sql query.use_legacy_sql = False - - if default_dataset is not None: - query.default_dataset = client.dataset(default_dataset) - - retry_query(query) -# [END insert_sql] + query.default_dataset = client.dataset(default_dataset) + query.run() if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument('--project', help='Google Cloud project name') + parser.add_argument('project', help='Google Cloud project name') parser.add_argument( - '--default-dataset', help='Default BigQuery dataset name') + 'default_dataset', help='Default BigQuery dataset name') parser.add_argument('sql_path', help='Path to SQL file') args = parser.parse_args() - insert_sql(args.sql_path, args.project, args.default_dataset) + insert_sql(args.project, args.default_dataset, args.sql_path) +# [END insert_sql] diff --git a/bigquery/dml/insert_sql_test.py b/bigquery/dml/insert_sql_test.py index 694c3c7f882..b2e295906ac 100644 --- a/bigquery/dml/insert_sql_test.py +++ b/bigquery/dml/insert_sql_test.py @@ -23,7 +23,7 @@ def test_insert_sql(cloud_config, capsys): 'resources', 'insert_sql_test.sql') - insert_sql(sql_path, default_dataset='test_dataset') + insert_sql(cloud_config.project, 'test_dataset', sql_path) out, _ = capsys.readouterr() diff --git a/bigquery/dml/populate_db.py b/bigquery/dml/populate_db.py index f0eac12869e..ad05a88a732 100755 --- a/bigquery/dml/populate_db.py +++ b/bigquery/dml/populate_db.py @@ -61,27 +61,11 @@ class UserSession(Base): ip_address = sqlalchemy.Column(sqlalchemy.String(length=40)) -class UserAction(Base): - __tablename__ = 'UserActions' - - id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True) - session_id = sqlalchemy.Column( - sqlalchemy.String(length=36), - sqlalchemy.ForeignKey('UserSessions.id')) - user_id = sqlalchemy.Column( - sqlalchemy.Integer, sqlalchemy.ForeignKey('Users.id')) - action_type = sqlalchemy.Column(sqlalchemy.String(length=64)) - action_time = sqlalchemy.Column(sqlalchemy.DateTime) - message = sqlalchemy.Column(sqlalchemy.Text) - - def generate_users(session, num_users): users = [] for userid in range(1, num_users + 1): - # Add more users at the end of 2016 than the beginning. - # https://en.wikipedia.org/wiki/Beta_distribution - year_portion = random.betavariate(3, 1) + year_portion = random.random() date_joined = datetime.datetime.fromtimestamp( TIMESTAMP_2016 + SECONDS_IN_2016 * year_portion) user = User(id=userid, date_joined=date_joined) @@ -123,45 +107,11 @@ def simulate_user_session(session, user, previous_user_session=None): user_id=user.id, login_time=login_time, ip_address=random_ip()) - session.add(user_session) - session.commit() - previous_action_time = login_time - total_actions = random.randrange(10) + 1 - - for _ in range(total_actions): - action_type = random.choice(['CLICKED', 'PURCHASED']) - action_time = ( - previous_action_time + - datetime.timedelta(seconds=random.randrange(59) + 1)) - message = 'breed={}'.format( - random.choice([ - 'Albera', - 'Angus', - 'Beefalo', - 'Droughtmaster', - 'Longhorn', - 'Guernsey', - 'Highland', - 'Holstein', - 'Jersey', - 'Normande', - 'Shetland', - 'Wagyu', - ])) - action = UserAction( - session_id=session_id, - user_id=user.id, - action_type=action_type, - action_time=action_time, - message=message) - - previous_action_time = action_time - session.add(action) - user_session.logout_time = ( - previous_action_time + + login_time + datetime.timedelta(seconds=(1 + random.randrange(59)))) session.commit() + session.add(user_session) return user_session @@ -180,7 +130,6 @@ def run_simulation(session, users): """Simulates app activity for all users.""" for n, user in enumerate(users): - if n % 100 == 0 and n != 0: print('Simulated data for {} users'.format(n)) diff --git a/bigquery/dml/populate_db_test.py b/bigquery/dml/populate_db_test.py index 7dbef2d15b3..66775834c82 100644 --- a/bigquery/dml/populate_db_test.py +++ b/bigquery/dml/populate_db_test.py @@ -22,15 +22,13 @@ def test_populate_db_populates_users(): session = create_session(engine) try: - populate_db(session, total_users=100) + populate_db(session, total_users=10) connection = session.connection().connection cursor = connection.cursor() cursor.execute('SELECT COUNT(*) FROM Users') - assert cursor.fetchone()[0] == 100 + assert cursor.fetchone()[0] == 10 cursor.execute('SELECT COUNT(*) FROM UserSessions') - assert cursor.fetchone()[0] >= 100 - cursor.execute('SELECT COUNT(*) FROM UserActions') - assert cursor.fetchone()[0] >= 100 + assert cursor.fetchone()[0] >= 10 finally: session.close()