Cet article est l'article du 6ème jour du Calendrier de l'Avent 2014 du blog de l'ingénieur VOYAGE GROUP.
Bonjour, est engagé dans les affaires de scientifique de données de temps libre dans VOYAGE GROUP [@ hagino3000] est (https://twitter.com/hagino3000).
Je pense que de nombreuses personnes ont commencé à intégrer des données à des fins d'analyse dans BigQuery en suivant le mouvement BigQuery ces jours-ci. Cependant, lorsque BigQuery est utilisé, le code de test tel que les lots agrégés ne sera pas terminé dans l'environnement local et vous souhaiterez vous référer à BigQuery lui-même. Cet article présente plusieurs approches.
L'exemple de code utilise Python + nose + BigQuery-Python.
La raison de s'inquiéter à propos du code de test est que BigQuery possède les deux fonctionnalités suivantes.
D'autant que la requête prend beaucoup de temps, je veux raccourcir cela dans le test.
Le code de test BigQuery-Python, par exemple, n'accède pas du tout à BigQuery.
https://github.com/tylertreat/BigQuery-Python/blob/master/bigquery/tests/test_client.py
Quoi qu'il en soit, il a le mérite de fonctionner à grande vitesse, mais il n'est pas possible de confirmer si le processus INSERT et l'instruction SELECT fonctionnent réellement. De plus, le code de test est plein de Mock.
Pour faire simple, il serait bien d'avoir un ensemble de données pour les tests unitaires, mais comme cela interfère lorsque plusieurs personnes exécutent le test en même temps, un ensemble de données est requis pour chaque test. Faites la même chose que Django fait avec Create Database chaque fois que vous exécutez un test.
Tout d'abord, le processus de création d'un ensemble de données jetables (+ table).
tests/helper.py
# coding=utf-8
from datetime import datetime
import glob
import json
import os
import random
import re
def setup_dataset(client, test_name):
"""
Préparer un ensemble de données pour les tests
Parameters
----------
client : bigquery.client
See https://github.com/tylertreat/BigQuery-Python
Returns
-------
dataset_id : string
ID de l'ensemble de données créé(ex. ut_hoge_test_359103)
schemas : dict (key: string, value: list)
La clé est le nom de la table et la valeur est la liste de définition de schéma utilisée pour créer la table.
"""
#Créer un jeu de données
dataset_id = 'ut_%s_%d' % (test_name ,int(random.random() * 1000000))
client.create_dataset(
dataset_id,
friendly_name='For unit test started at %s' % datetime.now())
#Créer une table à partir d'un fichier de définition de schéma
schemas = {}
BASE_DIR = os.path.normpath(os.path.join(os.path.dirname(__file__), '../'))
for schema_file in glob.glob(os.path.join(BASE_DIR, 'schema/*.json')):
table_name = re.search(r'([^/.]+).json$', schema_file).group(1)
schema = json.loads(open(schema_file).read())
schemas[table_name] = schema
client.create_table(dataset_id, table_name, schema)
return dataset_id, schemas
Créez un ensemble de données avec le corps de test et la configuration. De plus, afin d'utiliser cet ensemble de données pour le traitement à tester, le processus d'acquisition de l'ID de l'ensemble de données est converti en Mock.
test_hoge.py
# coding=utf-8
import time
import mock
from nose.tools import eq_
from nose.plugins.attrib import attr
import myapp.bq
import myapp.calc_daily_state
from . import helper
dataset_id = None
bq_client = None
#Exécuter en parallèle
_multiprocess_can_split_ = True
def setup():
global dataset_id
global bq_client
# BigQuery-Obtenir une instance de client Python
bq_client = myapp.bq.get_client(readonly=False)
#Créer un jeu de données
dataset_id, schemas = helper.setup_dataset(bq_client, 'test_hoge')
#Simulez le processus pour obtenir l'ID de l'ensemble de données
myapp.bq.get_dataset_id = mock.Mock(return_value=dataset_id)
#INSÉRER des données de test
bq_client.push_rows(dataset_id, 'events', [....Abréviation....])
#Il peut ne pas être possible d'interroger immédiatement après INSERT, alors dormez
time.sleep(10)
@attr('slow')
def test_calc_dau():
#Tests faisant référence à BigQuery
ret = myapp.calc_daily_state.calc_dau('2014/08/01')
eq_(ret, "....Abréviation....")
@attr('slow')
def test_calc_new_user():
#Tests faisant référence à BigQuery
ret = myapp.calc_daily_state.calc_new_user('2014/08/01')
eq_(ret, "....Abréviation....")
def teadown():
#Il semble préférable de supprimer l'ensemble de données et de le laisser en cas d'échec du test
bq_client.delete_dataset(dataset_id)
Dans cet exemple, le traitement à tester était supposé être en lecture seule, de sorte que l'ensemble de données n'a été créé qu'une seule fois. Cela prend 5 secondes pour chaque cas de test, donc je veux adhérer à 1 test et 1 assertion.
Il faut environ 1 seconde pour créer un jeu de données de configuration et charger les données. Étant donné que chaque cas prend du temps, il est possible de raccourcir le temps dans une certaine mesure en parallélisant.
#5 Exécuter des tests en parallèle
nosetests --processes=5 --process-timeout=30
Multiprocess: parallel testing — nose 1.3.4 documentation http://nose.readthedocs.org/en/latest/plugins/multiprocess.html
Dans l'exemple ci-dessus, l'ensemble de données a été créé par la configuration du module, mais quand il s'agit de tester un processus avec INSERT, il est nécessaire d'éliminer l'influence entre les tests. Cela prendra encore plus de temps. Parce que si vous essayez de vérifier le résultat immédiatement après INSERT et d'exécuter la requête, vous n'obtiendrez pas le résultat. Après INSERT, vous devez dormir pendant quelques secondes, puis exécuter la requête (qui prend environ 5 secondes) pour vérifier le résultat.
test_fuga.py
#Exécuter en parallèle
_multiprocess_can_split_ = True
@attr('slow')
class TestFugaMethodsWhichHasInsert(object):
def setup(self):
#Créer un jeu de données
(Abréviation)
self.dataset_id = dataset_id
self.bq_client = bq_client
def test_insert_foo(self):
#Tester le traitement avec INSERT
def test_insert_bar(self):
#Tester le traitement avec INSERT
def teardown(self):
self.bq_client.delete_dataset(self.dataset_id)
Il est inévitable de s'endormir à la fin du test, laissons donc à l'outil CI. Dans ce cas, les données BigQuery peuvent être exécutées en parallèle, car l'influence entre les tests peut être supprimée.
Seule la méthode d'émission d'une requête et d'insertion de données utilise le BigQuery réel et sépare les répertoires comme un test lent. Pour les autres traitements, la partie qui renvoie le résultat de la requête est convertie en Mock.
N'écrivez pas de tests dans le code de la tâche d'analyse
Attendez que quelque chose comme DynamoDB Local apparaisse. Ou faites-le.
Il n'existe actuellement aucune option indiquant qu'il s'agit de la meilleure solution.Par conséquent, si vous souhaitez réduire Mock et simplifier votre code de test, reportez-vous directement à BigQuery et vice versa, utilisez Mock. Supprimez les dépendances entre les tests afin que les tests puissent être exécutés en parallèle. La création d'un ensemble de données ne prend pas longtemps, vous pouvez donc le créer pour chaque test.
J'apprécierais que vous me disiez s'il y a un meilleur modèle.
Les frais de demain sont de @brtriver, attendez-le avec impatience.
Voyons ce qu'il advient du code de test de la commande bq faite par Python. Vous pouvez émettre une requête avec
bq query xxx```, vous devriez donc pouvoir avoir un tel test.
https://code.google.com/p/google-bigquery-tools/source/browse/bq/bigquery_client_test.py
Il n'y a pas de test pour exécuter la requête. (´ ・ ω ・ `)
Recommended Posts