Content-Length: 478395 | pFad | https://github.com/googleapis/python-aiplatform/commit/c10923b47b9b9941d14ae2c5398348d971a23f9d

B0 fix: show logs when TFX pipelines are submitted (#976) · googleapis/python-aiplatform@c10923b · GitHub
Skip to content

Commit c10923b

Browse files
authored
fix: show logs when TFX pipelines are submitted (#976)
* Fix for showing logs when TFX pipelines are submitted * Add sdkVersion and TFX spec to tests * Remove params from tfx pipeline spec * Add new tests for TFX pipelines * Update tests after linting
1 parent 1c34154 commit c10923b

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed

google/cloud/aiplatform/pipeline_jobs.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
import datetime
19+
import logging
1920
import time
2021
import re
2122
from typing import Any, Dict, List, Optional
@@ -273,6 +274,10 @@ def submit(
273274
if network:
274275
self._gca_resource.network = network
275276

277+
# Prevents logs from being supressed on TFX pipelines
278+
if self._gca_resource.pipeline_spec.get("sdkVersion", "").startswith("tfx"):
279+
_LOGGER.setLevel(logging.INFO)
280+
276281
_LOGGER.log_create_with_lro(self.__class__)
277282

278283
self._gca_resource = self.api_client.create_pipeline_job(

tests/unit/aiplatform/test_pipeline_jobs.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@
9292
"schemaVersion": "2.1.0",
9393
"components": {},
9494
}
95+
_TEST_TFX_PIPELINE_SPEC = {
96+
"pipelineInfo": {"name": "my-pipeline"},
97+
"root": {
98+
"dag": {"tasks": {}},
99+
"inputDefinitions": {"parameters": {"string_param": {"type": "STRING"}}},
100+
},
101+
"schemaVersion": "2.0.0",
102+
"sdkVersion": "tfx-1.4.0",
103+
"components": {},
104+
}
95105

96106
_TEST_PIPELINE_JOB_LEGACY = {
97107
"runtimeConfig": {},
@@ -101,6 +111,10 @@
101111
"runtimeConfig": {"parameterValues": _TEST_PIPELINE_PARAMETER_VALUES},
102112
"pipelineSpec": _TEST_PIPELINE_SPEC,
103113
}
114+
_TEST_PIPELINE_JOB_TFX = {
115+
"runtimeConfig": {},
116+
"pipelineSpec": _TEST_TFX_PIPELINE_SPEC,
117+
}
104118

105119
_TEST_PIPELINE_GET_METHOD_NAME = "get_fake_pipeline_job"
106120
_TEST_PIPELINE_LIST_METHOD_NAME = "list_fake_pipeline_jobs"
@@ -378,6 +392,78 @@ def test_run_call_pipeline_service_create_legacy(
378392
gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED
379393
)
380394

395+
@pytest.mark.parametrize(
396+
"job_spec_json", [_TEST_TFX_PIPELINE_SPEC, _TEST_PIPELINE_JOB_TFX],
397+
)
398+
@pytest.mark.parametrize("sync", [True, False])
399+
def test_run_call_pipeline_service_create_tfx(
400+
self,
401+
mock_pipeline_service_create,
402+
mock_pipeline_service_get,
403+
job_spec_json,
404+
mock_load_json,
405+
sync,
406+
):
407+
aiplatform.init(
408+
project=_TEST_PROJECT,
409+
staging_bucket=_TEST_GCS_BUCKET_NAME,
410+
location=_TEST_LOCATION,
411+
credentials=_TEST_CREDENTIALS,
412+
)
413+
414+
job = pipeline_jobs.PipelineJob(
415+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
416+
template_path=_TEST_TEMPLATE_PATH,
417+
job_id=_TEST_PIPELINE_JOB_ID,
418+
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES_LEGACY,
419+
enable_caching=True,
420+
)
421+
422+
job.run(
423+
service_account=_TEST_SERVICE_ACCOUNT, network=_TEST_NETWORK, sync=sync,
424+
)
425+
426+
if not sync:
427+
job.wait()
428+
429+
expected_runtime_config_dict = {
430+
"gcsOutputDirectory": _TEST_GCS_BUCKET_NAME,
431+
"parameters": {"string_param": {"stringValue": "hello"}},
432+
}
433+
runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb
434+
json_format.ParseDict(expected_runtime_config_dict, runtime_config)
435+
436+
pipeline_spec = job_spec_json.get("pipelineSpec") or job_spec_json
437+
438+
# Construct expected request
439+
expected_gapic_pipeline_job = gca_pipeline_job_v1.PipelineJob(
440+
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
441+
pipeline_spec={
442+
"components": {},
443+
"pipelineInfo": pipeline_spec["pipelineInfo"],
444+
"root": pipeline_spec["root"],
445+
"schemaVersion": "2.0.0",
446+
"sdkVersion": "tfx-1.4.0",
447+
},
448+
runtime_config=runtime_config,
449+
service_account=_TEST_SERVICE_ACCOUNT,
450+
network=_TEST_NETWORK,
451+
)
452+
453+
mock_pipeline_service_create.assert_called_once_with(
454+
parent=_TEST_PARENT,
455+
pipeline_job=expected_gapic_pipeline_job,
456+
pipeline_job_id=_TEST_PIPELINE_JOB_ID,
457+
)
458+
459+
mock_pipeline_service_get.assert_called_with(
460+
name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY
461+
)
462+
463+
assert job._gca_resource == make_pipeline_job(
464+
gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED
465+
)
466+
381467
@pytest.mark.parametrize(
382468
"job_spec_json", [_TEST_PIPELINE_SPEC, _TEST_PIPELINE_JOB],
383469
)

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/googleapis/python-aiplatform/commit/c10923b47b9b9941d14ae2c5398348d971a23f9d

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy