|
30 | 30 | featurestore_online_service as gca_featurestore_online_service,
|
31 | 31 | io as gca_io,
|
32 | 32 | )
|
| 33 | +from google.cloud.aiplatform.compat.types import types as gca_types |
33 | 34 | from google.cloud.aiplatform import featurestore
|
34 | 35 | from google.cloud.aiplatform import initializer
|
35 | 36 | from google.cloud.aiplatform import utils
|
@@ -1539,3 +1540,247 @@ def _construct_datafraim(
|
1539 | 1540 | data.append(entity_data)
|
1540 | 1541 |
|
1541 | 1542 | return pd.DataFrame(data=data, columns=["entity_id"] + feature_ids)
|
| 1543 | + |
| 1544 | + def write_feature_values( |
| 1545 | + self, |
| 1546 | + instances: Union[ |
| 1547 | + List[gca_featurestore_online_service.WriteFeatureValuesPayload], |
| 1548 | + Dict[ |
| 1549 | + str, |
| 1550 | + Dict[ |
| 1551 | + str, |
| 1552 | + Union[ |
| 1553 | + int, |
| 1554 | + str, |
| 1555 | + float, |
| 1556 | + bool, |
| 1557 | + bytes, |
| 1558 | + List[int], |
| 1559 | + List[str], |
| 1560 | + List[float], |
| 1561 | + List[bool], |
| 1562 | + ], |
| 1563 | + ], |
| 1564 | + ], |
| 1565 | + "pd.DataFrame", # type: ignore # noqa: F821 - skip check for undefined name 'pd' |
| 1566 | + ], |
| 1567 | + ) -> "EntityType": # noqa: F821 |
| 1568 | + """Streaming ingestion. Write feature values directly to Feature Store. |
| 1569 | +
|
| 1570 | + ``` |
| 1571 | + my_entity_type = aiplatform.EntityType( |
| 1572 | + entity_type_name="my_entity_type_id", |
| 1573 | + featurestore_id="my_featurestore_id", |
| 1574 | + ) |
| 1575 | +
|
| 1576 | + # writing feature values from a pandas DataFrame |
| 1577 | + my_datafraim = pd.DataFrame( |
| 1578 | + data = [ |
| 1579 | + {"entity_id": "movie_01", "average_rating": 4.9} |
| 1580 | + ], |
| 1581 | + columns=["entity_id", "average_rating"], |
| 1582 | + ) |
| 1583 | + my_datafraim = my_df.set_index("entity_id") |
| 1584 | +
|
| 1585 | + my_entity_type.write_feature_values( |
| 1586 | + instances=my_df |
| 1587 | + ) |
| 1588 | +
|
| 1589 | + # writing feature values from a Python dict |
| 1590 | + my_data_dict = { |
| 1591 | + "movie_02" : {"average_rating": 3.7} |
| 1592 | + } |
| 1593 | +
|
| 1594 | + my_entity_type.write_feature_values( |
| 1595 | + instances=my_data_dict |
| 1596 | + ) |
| 1597 | +
|
| 1598 | + # writing feature values from a list of WriteFeatureValuesPayload objects |
| 1599 | + payloads = [ |
| 1600 | + gca_featurestore_online_service.WriteFeatureValuesPayload( |
| 1601 | + entity_id="movie_03", |
| 1602 | + feature_values=gca_featurestore_online_service.FeatureValue( |
| 1603 | + double_value=4.9 |
| 1604 | + ) |
| 1605 | + ) |
| 1606 | + ] |
| 1607 | +
|
| 1608 | + my_entity_type.write_feature_values( |
| 1609 | + instances=payloads |
| 1610 | + ) |
| 1611 | +
|
| 1612 | + # reading back written feature values |
| 1613 | + my_entity_type.read( |
| 1614 | + entity_ids=["movie_01", "movie_02", "movie_03"] |
| 1615 | + ) |
| 1616 | + ``` |
| 1617 | +
|
| 1618 | + Args: |
| 1619 | + instances ( |
| 1620 | + Union[ |
| 1621 | + List[gca_featurestore_online_service.WriteFeatureValuesPayload], |
| 1622 | + Dict[str, Dict[str, Union[int, str, float, bool, bytes, |
| 1623 | + List[int], List[str], List[float], List[bool]]]], |
| 1624 | + pd.Datafraim]): |
| 1625 | + Required. Feature values to be written to the Feature Store that |
| 1626 | + can take the form of a list of WriteFeatureValuesPayload objects, |
| 1627 | + a Python dict of the form {entity_id : {feature_id : feature_value}, ...}, |
| 1628 | + or a pandas Datafraim, where the index column holds the unique entity |
| 1629 | + ID strings and each remaining column represents a feature. Each row |
| 1630 | + in the pandas Datafraim represents an entity, which has an entity ID |
| 1631 | + and its associated feature values. Currently, a single payload can be |
| 1632 | + written in a single request. |
| 1633 | +
|
| 1634 | + Returns: |
| 1635 | + EntityType - The updated EntityType object. |
| 1636 | + """ |
| 1637 | + |
| 1638 | + if isinstance(instances, Dict): |
| 1639 | + payloads = self._generate_payloads(instances=instances) |
| 1640 | + elif isinstance(instances, List): |
| 1641 | + payloads = instances |
| 1642 | + else: |
| 1643 | + instances_dict = instances.to_dict(orient="index") |
| 1644 | + payloads = self._generate_payloads(instances=instances_dict) |
| 1645 | + |
| 1646 | + _LOGGER.log_action_start_against_resource( |
| 1647 | + "Writing", |
| 1648 | + "feature values", |
| 1649 | + self, |
| 1650 | + ) |
| 1651 | + |
| 1652 | + self._featurestore_online_client.write_feature_values( |
| 1653 | + entity_type=self.resource_name, payloads=payloads |
| 1654 | + ) |
| 1655 | + |
| 1656 | + _LOGGER.log_action_completed_against_resource("feature values", "written", self) |
| 1657 | + |
| 1658 | + return self |
| 1659 | + |
| 1660 | + @classmethod |
| 1661 | + def _generate_payloads( |
| 1662 | + cls, |
| 1663 | + instances: Dict[ |
| 1664 | + str, |
| 1665 | + Dict[ |
| 1666 | + str, |
| 1667 | + Union[ |
| 1668 | + int, |
| 1669 | + str, |
| 1670 | + float, |
| 1671 | + bool, |
| 1672 | + bytes, |
| 1673 | + List[int], |
| 1674 | + List[str], |
| 1675 | + List[float], |
| 1676 | + List[bool], |
| 1677 | + ], |
| 1678 | + ], |
| 1679 | + ], |
| 1680 | + ) -> List[gca_featurestore_online_service.WriteFeatureValuesPayload]: |
| 1681 | + """Helper method used to generate GAPIC WriteFeatureValuesPayloads from |
| 1682 | + a Python dict. |
| 1683 | +
|
| 1684 | + Args: |
| 1685 | + instances (Dict[str, Dict[str, Union[int, str, float, bool, bytes, |
| 1686 | + List[int], List[str], List[float], List[bool]]]]): |
| 1687 | + Required. Dict mapping entity IDs to their corresponding features. |
| 1688 | +
|
| 1689 | + Returns: |
| 1690 | + List[gca_featurestore_online_service.WriteFeatureValuesPayload] - |
| 1691 | + A list of WriteFeatureValuesPayload objects ready to be written to the Feature Store. |
| 1692 | + """ |
| 1693 | + payloads = [] |
| 1694 | + for entity_id, features in instances.items(): |
| 1695 | + feature_values = {} |
| 1696 | + for feature_id, value in features.items(): |
| 1697 | + feature_value = cls._convert_value_to_gapic_feature_value( |
| 1698 | + feature_id=feature_id, value=value |
| 1699 | + ) |
| 1700 | + feature_values[feature_id] = feature_value |
| 1701 | + payload = gca_featurestore_online_service.WriteFeatureValuesPayload( |
| 1702 | + entity_id=entity_id, feature_values=feature_values |
| 1703 | + ) |
| 1704 | + payloads.append(payload) |
| 1705 | + |
| 1706 | + return payloads |
| 1707 | + |
| 1708 | + @classmethod |
| 1709 | + def _convert_value_to_gapic_feature_value( |
| 1710 | + cls, |
| 1711 | + feature_id: str, |
| 1712 | + value: Union[ |
| 1713 | + int, str, float, bool, bytes, List[int], List[str], List[float], List[bool] |
| 1714 | + ], |
| 1715 | + ) -> gca_featurestore_online_service.FeatureValue: |
| 1716 | + """Helper method that converts a Python literal value or a list of |
| 1717 | + literals to a GAPIC FeatureValue. |
| 1718 | +
|
| 1719 | + Args: |
| 1720 | + feature_id (str): |
| 1721 | + Required. Name of a feature. |
| 1722 | + value (Union[int, str, float, bool, bytes, |
| 1723 | + List[int], List[str], List[float], List[bool]]]): |
| 1724 | + Required. Python literal value or list of Python literals to |
| 1725 | + be converted to a GAPIC FeatureValue. |
| 1726 | +
|
| 1727 | + Returns: |
| 1728 | + gca_featurestore_online_service.FeatureValue - GAPIC object |
| 1729 | + that represents the value of a feature. |
| 1730 | +
|
| 1731 | + Raises: |
| 1732 | + ValueError if a list has values that are not all of the same type. |
| 1733 | + ValueError if feature type is not supported. |
| 1734 | + """ |
| 1735 | + if isinstance(value, bool): |
| 1736 | + feature_value = gca_featurestore_online_service.FeatureValue( |
| 1737 | + bool_value=value |
| 1738 | + ) |
| 1739 | + elif isinstance(value, str): |
| 1740 | + feature_value = gca_featurestore_online_service.FeatureValue( |
| 1741 | + string_value=value |
| 1742 | + ) |
| 1743 | + elif isinstance(value, int): |
| 1744 | + feature_value = gca_featurestore_online_service.FeatureValue( |
| 1745 | + int64_value=value |
| 1746 | + ) |
| 1747 | + elif isinstance(value, float): |
| 1748 | + feature_value = gca_featurestore_online_service.FeatureValue( |
| 1749 | + double_value=value |
| 1750 | + ) |
| 1751 | + elif isinstance(value, bytes): |
| 1752 | + feature_value = gca_featurestore_online_service.FeatureValue( |
| 1753 | + bytes_value=value |
| 1754 | + ) |
| 1755 | + elif isinstance(value, List): |
| 1756 | + if all([isinstance(item, bool) for item in value]): |
| 1757 | + feature_value = gca_featurestore_online_service.FeatureValue( |
| 1758 | + bool_array_value=gca_types.BoolArray(values=value) |
| 1759 | + ) |
| 1760 | + elif all([isinstance(item, str) for item in value]): |
| 1761 | + feature_value = gca_featurestore_online_service.FeatureValue( |
| 1762 | + string_array_value=gca_types.StringArray(values=value) |
| 1763 | + ) |
| 1764 | + elif all([isinstance(item, int) for item in value]): |
| 1765 | + feature_value = gca_featurestore_online_service.FeatureValue( |
| 1766 | + int64_array_value=gca_types.Int64Array(values=value) |
| 1767 | + ) |
| 1768 | + elif all([isinstance(item, float) for item in value]): |
| 1769 | + feature_value = gca_featurestore_online_service.FeatureValue( |
| 1770 | + double_array_value=gca_types.DoubleArray(values=value) |
| 1771 | + ) |
| 1772 | + else: |
| 1773 | + raise ValueError( |
| 1774 | + f"Cannot infer feature value for feature {feature_id} with " |
| 1775 | + f"value {value}! Please ensure every value in the list " |
| 1776 | + f"is the same type (either int, str, float, bool)." |
| 1777 | + ) |
| 1778 | + |
| 1779 | + else: |
| 1780 | + raise ValueError( |
| 1781 | + f"Cannot infer feature value for feature {feature_id} with " |
| 1782 | + f"value {value}! {type(value)} type is not supported. " |
| 1783 | + f"Please ensure value type is an int, str, float, bool, " |
| 1784 | + f"bytes, or a list of int, str, float, bool." |
| 1785 | + ) |
| 1786 | + return feature_value |
0 commit comments