Skip to content

Add DML samples for BigQuery. #546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bigquery/dml/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sample_db_export.sql
15 changes: 15 additions & 0 deletions bigquery/dml/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# BigQuery DML Samples

<!-- auto-doc-link -->
These samples are used on the following documentation page:

> https://cloud.google.combigquery/docs/loading-data-sql-dml

<!-- end-auto-doc-link -->

To create a test database, run the `populate_db.py` script.

```
python populate_db.py 100 localhost root 'mysql-password' sample_db
```

78 changes: 78 additions & 0 deletions bigquery/dml/insert_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/env python

# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Sample that runs a file containing INSERT SQL statements in Big Query.

This could be used to run the INSERT statements in a mysqldump output such as

mysqldump --user=root \
--password='secret-password' \
--host=127.0.0.1 \
--no-create-info sample_db \
--skip-add-locks > sample_db_export.sql

To run, first create tables with the same names and columns as the sample
database. Then run this script.

python insert_sql.py my-project my_dataset sample_db_export.sql
"""

# [START insert_sql]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either include all imports, or include none of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. (Included all)

import argparse

from gcloud import bigquery

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This newline isn't needed (blank newline is recommended between expressions and statements, but not between two statements)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good to know. I deleted this function entirely based on your previous feedback. (I made some command-line samples to do the same thing and it really does complicate it a lot to add retries. It doesn't seem to be helping for most errors, anyway)


def insert_sql(project, default_dataset, sql_path):
"""Run all the SQL statements in a SQL file."""

client = bigquery.Client(project=project)

with open(sql_path) as f:
for line in f:
line = line.strip()

if not line.startswith('INSERT'):
continue

print('Running query: {}{}'.format(
line[:60],
'...' if len(line) > 60 else ''))
query = client.run_sync_query(line)

# Set use_legacy_sql to False to enable standard SQL syntax.
# This is required to use the Data Manipulation Language features.
#
# For more information about enabling standard SQL, see:
# https://cloud.google.com/bigquery/sql-reference/enabling-standard-sql
query.use_legacy_sql = False
query.default_dataset = client.dataset(default_dataset)
query.run()


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('project', help='Google Cloud project name')
parser.add_argument(
'default_dataset', help='Default BigQuery dataset name')
parser.add_argument('sql_path', help='Path to SQL file')

args = parser.parse_args()

insert_sql(args.project, args.default_dataset, args.sql_path)
# [END insert_sql]
32 changes: 32 additions & 0 deletions bigquery/dml/insert_sql_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os.path

from insert_sql import insert_sql


def test_insert_sql(cloud_config, capsys):
sql_path = os.path.join(
os.path.dirname(__file__),
'resources',
'insert_sql_test.sql')

insert_sql(cloud_config.project, 'test_dataset', sql_path)

out, _ = capsys.readouterr()

assert (
'INSERT INTO `test_table` (`Name`) VALUES (\'hello world\')'
in out)
182 changes: 182 additions & 0 deletions bigquery/dml/populate_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#!/usr/bin/env python

# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Command-line tool to simulate user actions and write to SQL database.
"""

from __future__ import division

import argparse
import datetime
import random
import uuid

from six.moves.urllib import parse
import sqlalchemy
from sqlalchemy.ext import declarative
import sqlalchemy.orm


SECONDS_IN_DAY = 24 * 60 * 60
SECONDS_IN_2016 = 366 * SECONDS_IN_DAY

# Unix timestamp for the beginning of 2016.
# http://stackoverflow.com/a/19801806/101923
TIMESTAMP_2016 = (
datetime.datetime(2016, 1, 1, 0, 0, 0) -
datetime.datetime.fromtimestamp(0)).total_seconds()


Base = declarative.declarative_base()


class User(Base):
__tablename__ = 'Users'

id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
date_joined = sqlalchemy.Column(sqlalchemy.DateTime)


class UserSession(Base):
__tablename__ = 'UserSessions'

id = sqlalchemy.Column(sqlalchemy.String(length=36), primary_key=True)
user_id = sqlalchemy.Column(
sqlalchemy.Integer, sqlalchemy.ForeignKey('Users.id'))
login_time = sqlalchemy.Column(sqlalchemy.DateTime)
logout_time = sqlalchemy.Column(sqlalchemy.DateTime)
ip_address = sqlalchemy.Column(sqlalchemy.String(length=40))


def generate_users(session, num_users):
users = []

for userid in range(1, num_users + 1):
year_portion = random.random()
date_joined = datetime.datetime.fromtimestamp(
TIMESTAMP_2016 + SECONDS_IN_2016 * year_portion)
user = User(id=userid, date_joined=date_joined)
users.append(user)
session.add(user)

session.commit()
return users


def random_ip():
"""Choose a random example IP address.

Examples are chosen from the test networks described in
https://tools.ietf.org/html/rfc5737
"""
network = random.choice([
'192.0.2', # RFC-5737 TEST-NET-1
'198.51.100', # RFC-5737 TEST-NET-2
'203.0.113', # RFC-5737 TEST-NET-3
])
ip_address = '{}.{}'.format(network, random.randrange(256))
return ip_address


def simulate_user_session(session, user, previous_user_session=None):
"""Simulates a single session (login to logout) of a user's history."""
login_time = user.date_joined

if previous_user_session is not None:
login_time = (
previous_user_session.logout_time +
datetime.timedelta(
days=1, seconds=random.randrange(SECONDS_IN_DAY)))

session_id = str(uuid.uuid4())
user_session = UserSession(
id=session_id,
user_id=user.id,
login_time=login_time,
ip_address=random_ip())
user_session.logout_time = (
login_time +
datetime.timedelta(seconds=(1 + random.randrange(59))))
session.commit()
session.add(user_session)
return user_session


def simulate_user_history(session, user):
"""Simulates the entire history of activity for a single user."""
total_sessions = random.randrange(10)
previous_user_session = None

for _ in range(total_sessions):
user_session = simulate_user_session(
session, user, previous_user_session)
previous_user_session = user_session


def run_simulation(session, users):
"""Simulates app activity for all users."""

for n, user in enumerate(users):
if n % 100 == 0 and n != 0:
print('Simulated data for {} users'.format(n))

simulate_user_history(session, user)

print('COMPLETE: Simulated data for {} users'.format(len(users)))


def populate_db(session, total_users=3):
"""Populate database with total_users simulated users and their actions."""
users = generate_users(session, total_users)
run_simulation(session, users)


def create_session(engine):
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
Session = sqlalchemy.orm.sessionmaker(bind=engine)
return Session()


def main(total_users, host, user, password, db_name):
engine = sqlalchemy.create_engine(
'mysql+pymysql://{user}:{password}@{host}/{db_name}'.format(
user=user,
password=parse.quote_plus(password),
host=host,
db_name=db_name))
session = create_session(engine)

try:
populate_db(session, total_users)
finally:
session.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'total_users', help='How many simulated users to create.', type=int)
parser.add_argument('host', help='Host of the database to write to.')
parser.add_argument('user', help='User to connect to the database.')
parser.add_argument('password', help='Password for the database user.')
parser.add_argument('db', help='Name of the database to write to.')

args = parser.parse_args()

main(args.total_users, args.host, args.user, args.password, args.db)
34 changes: 34 additions & 0 deletions bigquery/dml/populate_db_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sqlalchemy

from populate_db import create_session, populate_db


def test_populate_db_populates_users():
engine = sqlalchemy.create_engine('sqlite://')
session = create_session(engine)

try:
populate_db(session, total_users=10)

connection = session.connection().connection
cursor = connection.cursor()
cursor.execute('SELECT COUNT(*) FROM Users')
assert cursor.fetchone()[0] == 10
cursor.execute('SELECT COUNT(*) FROM UserSessions')
assert cursor.fetchone()[0] >= 10
finally:
session.close()
5 changes: 5 additions & 0 deletions bigquery/dml/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
flake8==3.0.4
gcloud==0.18.1
PyMySQL==0.7.7
six==1.10.0
SQLAlchemy==1.0.15
6 changes: 6 additions & 0 deletions bigquery/dml/resources/insert_sql_test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file is used to test ../insert_sql.py.
-- These are comments.
-- Each query to be executed should be on a single line.

/* Another ignored line. */
INSERT INTO `test_table` (`Name`) VALUES ('hello world')
3 changes: 3 additions & 0 deletions scripts/resources/docs-links.json
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@
"/bigquery/docs/data": [
"bigquery/api/sync_query.py"
],
"bigquery/docs/loading-data-sql-dml": [
"bigquery/dml/insert_sql.py"
],
"/appengine/docs/python/memcache/examples": [
"appengine/memcache/snippets/snippets.py",
"appengine/memcache/guestbook/main.py"
Expand Down