Skip to content

Add DML samples for BigQuery. #546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 28, 2016
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bigquery/dml/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sample_db_export.sql
15 changes: 15 additions & 0 deletions bigquery/dml/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# BigQuery DML Samples

<!-- auto-doc-link -->
These samples are used on the following documentation page:

> https://cloud.google.combigquery/docs/loading-data-sql-dml

<!-- end-auto-doc-link -->

To create a test database, run the `populate_db.py` script.

```
python populate_db.py 100 localhost root 'mysql-password' sample_db
```

107 changes: 107 additions & 0 deletions bigquery/dml/insert_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python

# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Sample to run line-separated SQL statements in Big Query from a file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awkward phrasing, maybe: Sample that runs a file containing line-separated SQL statements in Big Query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. (I realized this sample was a bit more complicated than it needed to be, too. I changed it to "Sample that runs a file containing INSERT SQL statements in Big Query." and modified the loop to look for lines that start with INSERT (to match the command-line sample I wrote for the docs)


This could be used to run INSERT DML statements from a mysqldump output such as

mysqldump --user=root \
--password='secret-password' \
--host=127.0.0.1 \
--no-create-info sample_db \
--skip-add-locks > sample_db_export.sql

To run, first create tables with the same names and columns as the sample
database. Then run this script.

python insert_sql.py \
--project=my-project \
--default_dataset=my_db \
sample_db_export.sql
"""

from __future__ import print_function

import argparse
# [START insert_sql]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either include all imports, or include none of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. (Included all)

import time

from gcloud import bigquery
from gcloud import exceptions


def retry_query(query, times=3):
"""Retry a query up to some number of times."""

for attempt in range(times):

try:
query.run()
return
except exceptions.GCloudError as err:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This newline isn't needed (blank newline is recommended between expressions and statements, but not between two statements)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good to know. I deleted this function entirely based on your previous feedback. (I made some command-line samples to do the same thing and it really does complicate it a lot to add retries. It doesn't seem to be helping for most errors, anyway)

if attempt == times - 1:
print('Giving up')
raise

print('Retrying, got error: {}'.format(err))
time.sleep(1)


def insert_sql(sql_path, project=None, default_dataset=None):
"""Run all the SQL statements in a SQL file."""

client = bigquery.Client(project=project)

with open(sql_path) as f:
for line in f:
line = line.strip()

# Ignore blank lines and comments.
if line == '' or line.startswith('--') or line.startswith('/*'):
continue

print('Running query: {}{}'.format(
line[:60],
'...' if len(line) > 60 else ''))
query = client.run_sync_query(line)

# Set use_legacy_sql to False to enable standard SQL syntax.
# This is required to use the Data Manipulation Language features.
#
# For more information about enabling standard SQL, see:
# https://cloud.google.com/bigquery/sql-reference/enabling-standard-sql
query.use_legacy_sql = False

if default_dataset is not None:
query.default_dataset = client.dataset(default_dataset)

retry_query(query)
# [END insert_sql]


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--project', help='Google Cloud project name')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use positional command-line args for required items, use flags for items with sensible defaults.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

parser.add_argument(
'--default-dataset', help='Default BigQuery dataset name')
parser.add_argument('sql_path', help='Path to SQL file')

args = parser.parse_args()

insert_sql(args.sql_path, args.project, args.default_dataset)
32 changes: 32 additions & 0 deletions bigquery/dml/insert_sql_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os.path

from insert_sql import insert_sql


def test_insert_sql(cloud_config, capsys):
sql_path = os.path.join(
os.path.dirname(__file__),
'resources',
'insert_sql_test.sql')

insert_sql(sql_path, default_dataset='test_dataset')

out, _ = capsys.readouterr()

assert (
'INSERT INTO `test_table` (`Name`) VALUES (\'hello world\')'
in out)
233 changes: 233 additions & 0 deletions bigquery/dml/populate_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
#!/usr/bin/env python

# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Command-line tool to simulate user actions and write to SQL database.
"""

from __future__ import division

import argparse
import datetime
import random
import uuid

from six.moves.urllib import parse
import sqlalchemy
from sqlalchemy.ext import declarative
import sqlalchemy.orm


SECONDS_IN_DAY = 24 * 60 * 60
SECONDS_IN_2016 = 366 * SECONDS_IN_DAY

# Unix timestamp for the beginning of 2016.
# http://stackoverflow.com/a/19801806/101923
TIMESTAMP_2016 = (
datetime.datetime(2016, 1, 1, 0, 0, 0) -
datetime.datetime.fromtimestamp(0)).total_seconds()


Base = declarative.declarative_base()


class User(Base):
__tablename__ = 'Users'

id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
date_joined = sqlalchemy.Column(sqlalchemy.DateTime)


class UserSession(Base):
__tablename__ = 'UserSessions'

id = sqlalchemy.Column(sqlalchemy.String(length=36), primary_key=True)
user_id = sqlalchemy.Column(
sqlalchemy.Integer, sqlalchemy.ForeignKey('Users.id'))
login_time = sqlalchemy.Column(sqlalchemy.DateTime)
logout_time = sqlalchemy.Column(sqlalchemy.DateTime)
ip_address = sqlalchemy.Column(sqlalchemy.String(length=40))


class UserAction(Base):
__tablename__ = 'UserActions'

id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
session_id = sqlalchemy.Column(
sqlalchemy.String(length=36),
sqlalchemy.ForeignKey('UserSessions.id'))
user_id = sqlalchemy.Column(
sqlalchemy.Integer, sqlalchemy.ForeignKey('Users.id'))
action_type = sqlalchemy.Column(sqlalchemy.String(length=64))
action_time = sqlalchemy.Column(sqlalchemy.DateTime)
message = sqlalchemy.Column(sqlalchemy.Text)


def generate_users(session, num_users):
users = []

for userid in range(1, num_users + 1):
# Add more users at the end of 2016 than the beginning.
# https://en.wikipedia.org/wiki/Beta_distribution
year_portion = random.betavariate(3, 1)
date_joined = datetime.datetime.fromtimestamp(
TIMESTAMP_2016 + SECONDS_IN_2016 * year_portion)
user = User(id=userid, date_joined=date_joined)
users.append(user)
session.add(user)

session.commit()
return users


def random_ip():
"""Choose a random example IP address.

Examples are chosen from the test networks described in
https://tools.ietf.org/html/rfc5737
"""
network = random.choice([
'192.0.2', # RFC-5737 TEST-NET-1
'198.51.100', # RFC-5737 TEST-NET-2
'203.0.113', # RFC-5737 TEST-NET-3
])
ip_address = '{}.{}'.format(network, random.randrange(256))
return ip_address


def simulate_user_session(session, user, previous_user_session=None):
"""Simulates a single session (login to logout) of a user's history."""
login_time = user.date_joined

if previous_user_session is not None:
login_time = (
previous_user_session.logout_time +
datetime.timedelta(
days=1, seconds=random.randrange(SECONDS_IN_DAY)))

session_id = str(uuid.uuid4())
user_session = UserSession(
id=session_id,
user_id=user.id,
login_time=login_time,
ip_address=random_ip())
session.add(user_session)
session.commit()
previous_action_time = login_time
total_actions = random.randrange(10) + 1

for _ in range(total_actions):
action_type = random.choice(['CLICKED', 'PURCHASED'])
action_time = (
previous_action_time +
datetime.timedelta(seconds=random.randrange(59) + 1))
message = 'breed={}'.format(
random.choice([
'Albera',
'Angus',
'Beefalo',
'Droughtmaster',
'Longhorn',
'Guernsey',
'Highland',
'Holstein',
'Jersey',
'Normande',
'Shetland',
'Wagyu',
]))
action = UserAction(
session_id=session_id,
user_id=user.id,
action_type=action_type,
action_time=action_time,
message=message)

previous_action_time = action_time
session.add(action)

user_session.logout_time = (
previous_action_time +
datetime.timedelta(seconds=(1 + random.randrange(59))))
session.commit()
return user_session


def simulate_user_history(session, user):
"""Simulates the entire history of activity for a single user."""
total_sessions = random.randrange(10)
previous_user_session = None

for _ in range(total_sessions):
user_session = simulate_user_session(
session, user, previous_user_session)
previous_user_session = user_session


def run_simulation(session, users):
"""Simulates app activity for all users."""

for n, user in enumerate(users):

if n % 100 == 0 and n != 0:
print('Simulated data for {} users'.format(n))

simulate_user_history(session, user)

print('COMPLETE: Simulated data for {} users'.format(len(users)))


def populate_db(session, total_users=3):
"""Populate database with total_users simulated users and their actions."""
users = generate_users(session, total_users)
run_simulation(session, users)


def create_session(engine):
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
Session = sqlalchemy.orm.sessionmaker(bind=engine)
return Session()


def main(total_users, host, user, password, db_name):
engine = sqlalchemy.create_engine(
'mysql+pymysql://{user}:{password}@{host}/{db_name}'.format(
user=user,
password=parse.quote_plus(password),
host=host,
db_name=db_name))
session = create_session(engine)

try:
populate_db(session, total_users)
finally:
session.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'total_users', help='How many simulated users to create.', type=int)
parser.add_argument('host', help='Host of the database to write to.')
parser.add_argument('user', help='User to connect to the database.')
parser.add_argument('password', help='Password for the database user.')
parser.add_argument('db', help='Name of the database to write to.')

args = parser.parse_args()

main(args.total_users, args.host, args.user, args.password, args.db)
Loading