Dataflow client library (#2450) · code4ward/python-docs-samples@cbb910e · GitHub
Skip to content

Commit cbb910e

Browse files
author
David Cavazos
authored
Dataflow client library (GoogleCloudPlatform#2450)
* Updated requirements * Update service naming convention * Prefer client libraries over shell commands * Update README format
1 parent 2b2cef9 commit cbb910e

5 files changed

Lines changed: 91 additions & 75 deletions

File tree

dataflow/run_template/README.md

Lines changed: 38 additions & 52 deletions

dataflow/run_template/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ def run(project, job, template, parameters=None):
4343
# 'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
4444
# }
4545

46-
service = build('dataflow', 'v1b3')
47-
request = service.projects().templates().launch(
46+
dataflow = build('dataflow', 'v1b3')
47+
request = dataflow.projects().templates().launch(
4848
projectId=project,
4949
gcsPath=template,
5050
body={

dataflow/run_template/main_test.py

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,38 +12,62 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
# To run the tests:
16+
# nox -s "lint(sample='./dataflow/run_template')"
17+
# nox -s "py27(sample='./dataflow/run_template')"
18+
# nox -s "py36(sample='./dataflow/run_template')"
19+
1520
import flask
1621
import json
1722
import os
1823
import pytest
19-
import subprocess as sp
2024
import time
2125

2226
from datetime import datetime
27+
from googleapiclient.discovery import build
28+
from googleapiclient.errors import HttpError
2329
from werkzeug.urls import url_encode
2430

2531
import main
2632

2733
PROJECT = os.environ['GCLOUD_PROJECT']
2834
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
2935

30-
# Wait time until a job can be cancelled, as a best effort.
31-
# If it fails to be cancelled, the job will run for ~8 minutes.
32-
WAIT_TIME = 5 # seconds
36+
dataflow = build('dataflow', 'v1b3')
3337

3438
# Create a fake "app" for generating test request contexts.
3539
@pytest.fixture(scope="module")
3640
def app():
3741
return flask.Flask(__name__)
3842

3943

40-
def test_run_template_empty_args(app):
44+
def test_run_template_python_empty_args(app):
45+
project = PROJECT
46+
job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S')
47+
template = 'gs://dataflow-templates/latest/Word_Count'
48+
with pytest.raises(HttpError):
49+
main.run(project, job, template)
50+
51+
52+
def test_run_template_python(app):
53+
project = PROJECT
54+
job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S')
55+
template = 'gs://dataflow-templates/latest/Word_Count'
56+
parameters = {
57+
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
58+
'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET),
59+
}
60+
res = main.run(project, job, template, parameters)
61+
dataflow_jobs_cancel(res['job']['id'])
62+
63+
64+
def test_run_template_http_empty_args(app):
4165
with app.test_request_context():
4266
with pytest.raises(KeyError):
4367
main.run_template(flask.request)
4468

4569

46-
def test_run_template_url(app):
70+
def test_run_template_http_url(app):
4771
args = {
4872
'project': PROJECT,
4973
'job': datetime.now().strftime('test_run_template_url-%Y%m%d-%H%M%S'),
@@ -54,12 +78,10 @@ def test_run_template_url(app):
5478
with app.test_request_context('/?' + url_encode(args)):
5579
res = main.run_template(flask.request)
5680
data = json.loads(res)
57-
job_id = data['job']['id']
58-
time.sleep(WAIT_TIME)
59-
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
81+
dataflow_jobs_cancel(data['job']['id'])
6082

6183

62-
def test_run_template_data(app):
84+
def test_run_template_http_data(app):
6385
args = {
6486
'project': PROJECT,
6587
'job': datetime.now().strftime('test_run_template_data-%Y%m%d-%H%M%S'),
@@ -70,12 +92,10 @@ def test_run_template_data(app):
7092
with app.test_request_context(data=args):
7193
res = main.run_template(flask.request)
7294
data = json.loads(res)
73-
job_id = data['job']['id']
74-
time.sleep(WAIT_TIME)
75-
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
95+
dataflow_jobs_cancel(data['job']['id'])
7696

7797

78-
def test_run_template_json(app):
98+
def test_run_template_http_json(app):
7999
args = {
80100
'project': PROJECT,
81101
'job': datetime.now().strftime('test_run_template_json-%Y%m%d-%H%M%S'),
@@ -86,6 +106,16 @@ def test_run_template_json(app):
86106
with app.test_request_context(json=args):
87107
res = main.run_template(flask.request)
88108
data = json.loads(res)
89-
job_id = data['job']['id']
90-
time.sleep(WAIT_TIME)
91-
assert sp.call(['gcloud', 'dataflow', 'jobs', 'cancel', job_id]) == 0
109+
dataflow_jobs_cancel(data['job']['id'])
110+
111+
112+
def dataflow_jobs_cancel(job_id):
113+
# Wait time until a job can be cancelled, as a best effort.
114+
# If it fails to be cancelled, the job will run for ~8 minutes.
115+
time.sleep(5) # seconds
116+
request = dataflow.projects().jobs().update(
117+
projectId=PROJECT,
118+
jobId=job_id,
119+
body={'requestedState': 'JOB_STATE_CANCELLED'}
120+
)
121+
request.execute()
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
google-api-python-client==1.7.9
1+
google-api-python-client==1.7.11

testing/requirements.txt

Lines changed: 3 additions & 3 deletions

0 commit comments

Comments
 (0)