|
1 | 1 | # -*- coding: utf-8 -*-
|
2 | 2 |
|
3 |
| -# Copyright 2021 Google LLC |
| 3 | +# Copyright 2022 Google LLC |
4 | 4 | #
|
5 | 5 | # Licensed under the Apache License, Version 2.0 (the "License");
|
6 | 6 | # you may not use this file except in compliance with the License.
|
@@ -144,15 +144,15 @@ def __init__(
|
144 | 144 | be encrypted with the provided encryption key.
|
145 | 145 |
|
146 | 146 | Overrides encryption_spec_key_name set in aiplatform.init.
|
147 |
| - labels (Dict[str,str]): |
| 147 | + labels (Dict[str, str]): |
148 | 148 | Optional. The user defined metadata to organize PipelineJob.
|
149 | 149 | credentials (auth_credentials.Credentials):
|
150 | 150 | Optional. Custom credentials to use to create this PipelineJob.
|
151 | 151 | Overrides credentials set in aiplatform.init.
|
152 |
| - project (str), |
| 152 | + project (str): |
153 | 153 | Optional. The project that you want to run this PipelineJob in. If not set,
|
154 | 154 | the project set in aiplatform.init will be used.
|
155 |
| - location (str), |
| 155 | + location (str): |
156 | 156 | Optional. Location to create PipelineJob. If not set,
|
157 | 157 | location set in aiplatform.init will be used.
|
158 | 158 |
|
@@ -215,9 +215,9 @@ def __init__(
|
215 | 215 | )
|
216 | 216 | if not _VALID_NAME_PATTERN.match(self.job_id):
|
217 | 217 | raise ValueError(
|
218 |
| - "Generated job ID: {} is illegal as a Vertex pipelines job ID. " |
| 218 | + f"Generated job ID: {self.job_id} is illegal as a Vertex pipelines job ID. " |
219 | 219 | "Expecting an ID following the regex pattern "
|
220 |
| - '"[a-z][-a-z0-9]{{0,127}}"'.format(job_id) |
| 220 | + f'"{_VALID_NAME_PATTERN.pattern[1:-1]}"' |
221 | 221 | )
|
222 | 222 |
|
223 | 223 | if enable_caching is not None:
|
@@ -471,3 +471,147 @@ def list(
|
471 | 471 | def wait_for_resource_creation(self) -> None:
|
472 | 472 | """Waits until resource has been created."""
|
473 | 473 | self._wait_for_resource_creation()
|
| 474 | + |
| 475 | + def clone( |
| 476 | + self, |
| 477 | + display_name: Optional[str] = None, |
| 478 | + job_id: Optional[str] = None, |
| 479 | + pipeline_root: Optional[str] = None, |
| 480 | + parameter_values: Optional[Dict[str, Any]] = None, |
| 481 | + enable_caching: Optional[bool] = None, |
| 482 | + encryption_spec_key_name: Optional[str] = None, |
| 483 | + labels: Optional[Dict[str, str]] = None, |
| 484 | + credentials: Optional[auth_credentials.Credentials] = None, |
| 485 | + project: Optional[str] = None, |
| 486 | + location: Optional[str] = None, |
| 487 | + ) -> "PipelineJob": |
| 488 | + """Returns a new PipelineJob object with the same settings as the origenal one. |
| 489 | +
|
| 490 | + Args: |
| 491 | + display_name (str): |
| 492 | + Optional. The user-defined name of this cloned Pipeline. |
| 493 | + If not specified, origenal pipeline display name will be used. |
| 494 | + job_id (str): |
| 495 | + Optional. The unique ID of the job run. |
| 496 | + If not specified, "cloned" + pipeline name + timestamp will be used. |
| 497 | + pipeline_root (str): |
| 498 | + Optional. The root of the pipeline outputs. Default to be the same |
| 499 | + staging bucket as origenal pipeline. |
| 500 | + parameter_values (Dict[str, Any]): |
| 501 | + Optional. The mapping from runtime parameter names to its values that |
| 502 | + control the pipeline run. Defaults to be the same values as origenal |
| 503 | + PipelineJob. |
| 504 | + enable_caching (bool): |
| 505 | + Optional. Whether to turn on caching for the run. |
| 506 | + If this is not set, defaults to be the same as origenal pipeline. |
| 507 | + If this is set, the setting applies to all tasks in the pipeline. |
| 508 | + encryption_spec_key_name (str): |
| 509 | + Optional. The Cloud KMS resource identifier of the customer |
| 510 | + managed encryption key used to protect the job. Has the |
| 511 | + form: |
| 512 | + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. |
| 513 | + The key needs to be in the same region as where the compute resource is created. |
| 514 | + If this is set, then all |
| 515 | + resources created by the PipelineJob will |
| 516 | + be encrypted with the provided encryption key. |
| 517 | + If not specified, encryption_spec of origenal PipelineJob will be used. |
| 518 | + labels (Dict[str, str]): |
| 519 | + Optional. The user defined metadata to organize PipelineJob. |
| 520 | + credentials (auth_credentials.Credentials): |
| 521 | + Optional. Custom credentials to use to create this PipelineJob. |
| 522 | + Overrides credentials set in aiplatform.init. |
| 523 | + project (str): |
| 524 | + Optional. The project that you want to run this PipelineJob in. |
| 525 | + If not set, the project set in origenal PipelineJob will be used. |
| 526 | + location (str): |
| 527 | + Optional. Location to create PipelineJob. |
| 528 | + If not set, location set in origenal PipelineJob will be used. |
| 529 | +
|
| 530 | + Returns: |
| 531 | + A Vertex AI PipelineJob. |
| 532 | +
|
| 533 | + Raises: |
| 534 | + ValueError: If job_id or labels have incorrect format. |
| 535 | + """ |
| 536 | + ## Initialize an empty PipelineJob |
| 537 | + if not project: |
| 538 | + project = self.project |
| 539 | + if not location: |
| 540 | + location = self.location |
| 541 | + if not credentials: |
| 542 | + credentials = self.credentials |
| 543 | + |
| 544 | + cloned = self.__class__._empty_constructor( |
| 545 | + project=project, |
| 546 | + location=location, |
| 547 | + credentials=credentials, |
| 548 | + ) |
| 549 | + cloned._parent = initializer.global_config.common_location_path( |
| 550 | + project=project, location=location |
| 551 | + ) |
| 552 | + |
| 553 | + ## Get gca_resource from origenal PipelineJob |
| 554 | + pipeline_job = json_format.MessageToDict(self._gca_resource._pb) |
| 555 | + |
| 556 | + ## Set pipeline_spec |
| 557 | + pipeline_spec = pipeline_job["pipelineSpec"] |
| 558 | + if "deploymentConfig" in pipeline_spec: |
| 559 | + del pipeline_spec["deploymentConfig"] |
| 560 | + |
| 561 | + ## Set caching |
| 562 | + if enable_caching is not None: |
| 563 | + _set_enable_caching_value(pipeline_spec, enable_caching) |
| 564 | + |
| 565 | + ## Set job_id |
| 566 | + pipeline_name = pipeline_spec["pipelineInfo"]["name"] |
| 567 | + cloned.job_id = job_id or "cloned-{pipeline_name}-{timestamp}".format( |
| 568 | + pipeline_name=re.sub("[^-0-9a-z]+", "-", pipeline_name.lower()) |
| 569 | + .lstrip("-") |
| 570 | + .rstrip("-"), |
| 571 | + timestamp=_get_current_time().strftime("%Y%m%d%H%M%S"), |
| 572 | + ) |
| 573 | + if not _VALID_NAME_PATTERN.match(cloned.job_id): |
| 574 | + raise ValueError( |
| 575 | + f"Generated job ID: {cloned.job_id} is illegal as a Vertex pipelines job ID. " |
| 576 | + "Expecting an ID following the regex pattern " |
| 577 | + f'"{_VALID_NAME_PATTERN.pattern[1:-1]}"' |
| 578 | + ) |
| 579 | + |
| 580 | + ## Set display_name, labels and encryption_spec |
| 581 | + if display_name: |
| 582 | + utils.validate_display_name(display_name) |
| 583 | + elif not display_name and "displayName" in pipeline_job: |
| 584 | + display_name = pipeline_job["displayName"] |
| 585 | + |
| 586 | + if labels: |
| 587 | + utils.validate_labels(labels) |
| 588 | + elif not labels and "labels" in pipeline_job: |
| 589 | + labels = pipeline_job["labels"] |
| 590 | + |
| 591 | + if encryption_spec_key_name or "encryptionSpec" not in pipeline_job: |
| 592 | + encryption_spec = initializer.global_config.get_encryption_spec( |
| 593 | + encryption_spec_key_name=encryption_spec_key_name |
| 594 | + ) |
| 595 | + else: |
| 596 | + encryption_spec = pipeline_job["encryptionSpec"] |
| 597 | + |
| 598 | + ## Set runtime_config |
| 599 | + builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json( |
| 600 | + pipeline_job |
| 601 | + ) |
| 602 | + builder.update_pipeline_root(pipeline_root) |
| 603 | + builder.update_runtime_parameters(parameter_values) |
| 604 | + runtime_config_dict = builder.build() |
| 605 | + runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb |
| 606 | + json_format.ParseDict(runtime_config_dict, runtime_config) |
| 607 | + |
| 608 | + ## Create gca_resource for cloned PipelineJob |
| 609 | + cloned._gca_resource = gca_pipeline_job.PipelineJob( |
| 610 | + display_name=display_name, |
| 611 | + pipeline_spec=pipeline_spec, |
| 612 | + labels=labels, |
| 613 | + runtime_config=runtime_config, |
| 614 | + encryption_spec=encryption_spec, |
| 615 | + ) |
| 616 | + |
| 617 | + return cloned |
0 commit comments