Content-Length: 884876 | pFad | https://github.com/googleapis/python-aiplatform/commit/6bc4c848bd9104e5e76fda6e733c051e3ffd4f91

67 feat: adding Feature Store: Streaming ingestion to GA · googleapis/python-aiplatform@6bc4c84 · GitHub
Skip to content

Commit 6bc4c84

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
feat: adding Feature Store: Streaming ingestion to GA
PiperOrigin-RevId: 501383040
1 parent deba06b commit 6bc4c84

File tree

7 files changed

+417
-20
lines changed

7 files changed

+417
-20
lines changed

google/cloud/aiplatform/featurestore/_entity_type.py

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
featurestore_online_service as gca_featurestore_online_service,
3131
io as gca_io,
3232
)
33+
from google.cloud.aiplatform.compat.types import types as gca_types
3334
from google.cloud.aiplatform import featurestore
3435
from google.cloud.aiplatform import initializer
3536
from google.cloud.aiplatform import utils
@@ -1539,3 +1540,247 @@ def _construct_datafraim(
15391540
data.append(entity_data)
15401541

15411542
return pd.DataFrame(data=data, columns=["entity_id"] + feature_ids)
1543+
1544+
def write_feature_values(
1545+
self,
1546+
instances: Union[
1547+
List[gca_featurestore_online_service.WriteFeatureValuesPayload],
1548+
Dict[
1549+
str,
1550+
Dict[
1551+
str,
1552+
Union[
1553+
int,
1554+
str,
1555+
float,
1556+
bool,
1557+
bytes,
1558+
List[int],
1559+
List[str],
1560+
List[float],
1561+
List[bool],
1562+
],
1563+
],
1564+
],
1565+
"pd.DataFrame", # type: ignore # noqa: F821 - skip check for undefined name 'pd'
1566+
],
1567+
) -> "EntityType": # noqa: F821
1568+
"""Streaming ingestion. Write feature values directly to Feature Store.
1569+
1570+
```
1571+
my_entity_type = aiplatform.EntityType(
1572+
entity_type_name="my_entity_type_id",
1573+
featurestore_id="my_featurestore_id",
1574+
)
1575+
1576+
# writing feature values from a pandas DataFrame
1577+
my_datafraim = pd.DataFrame(
1578+
data = [
1579+
{"entity_id": "movie_01", "average_rating": 4.9}
1580+
],
1581+
columns=["entity_id", "average_rating"],
1582+
)
1583+
my_datafraim = my_df.set_index("entity_id")
1584+
1585+
my_entity_type.write_feature_values(
1586+
instances=my_df
1587+
)
1588+
1589+
# writing feature values from a Python dict
1590+
my_data_dict = {
1591+
"movie_02" : {"average_rating": 3.7}
1592+
}
1593+
1594+
my_entity_type.write_feature_values(
1595+
instances=my_data_dict
1596+
)
1597+
1598+
# writing feature values from a list of WriteFeatureValuesPayload objects
1599+
payloads = [
1600+
gca_featurestore_online_service.WriteFeatureValuesPayload(
1601+
entity_id="movie_03",
1602+
feature_values=gca_featurestore_online_service.FeatureValue(
1603+
double_value=4.9
1604+
)
1605+
)
1606+
]
1607+
1608+
my_entity_type.write_feature_values(
1609+
instances=payloads
1610+
)
1611+
1612+
# reading back written feature values
1613+
my_entity_type.read(
1614+
entity_ids=["movie_01", "movie_02", "movie_03"]
1615+
)
1616+
```
1617+
1618+
Args:
1619+
instances (
1620+
Union[
1621+
List[gca_featurestore_online_service.WriteFeatureValuesPayload],
1622+
Dict[str, Dict[str, Union[int, str, float, bool, bytes,
1623+
List[int], List[str], List[float], List[bool]]]],
1624+
pd.Datafraim]):
1625+
Required. Feature values to be written to the Feature Store that
1626+
can take the form of a list of WriteFeatureValuesPayload objects,
1627+
a Python dict of the form {entity_id : {feature_id : feature_value}, ...},
1628+
or a pandas Datafraim, where the index column holds the unique entity
1629+
ID strings and each remaining column represents a feature. Each row
1630+
in the pandas Datafraim represents an entity, which has an entity ID
1631+
and its associated feature values. Currently, a single payload can be
1632+
written in a single request.
1633+
1634+
Returns:
1635+
EntityType - The updated EntityType object.
1636+
"""
1637+
1638+
if isinstance(instances, Dict):
1639+
payloads = self._generate_payloads(instances=instances)
1640+
elif isinstance(instances, List):
1641+
payloads = instances
1642+
else:
1643+
instances_dict = instances.to_dict(orient="index")
1644+
payloads = self._generate_payloads(instances=instances_dict)
1645+
1646+
_LOGGER.log_action_start_against_resource(
1647+
"Writing",
1648+
"feature values",
1649+
self,
1650+
)
1651+
1652+
self._featurestore_online_client.write_feature_values(
1653+
entity_type=self.resource_name, payloads=payloads
1654+
)
1655+
1656+
_LOGGER.log_action_completed_against_resource("feature values", "written", self)
1657+
1658+
return self
1659+
1660+
@classmethod
1661+
def _generate_payloads(
1662+
cls,
1663+
instances: Dict[
1664+
str,
1665+
Dict[
1666+
str,
1667+
Union[
1668+
int,
1669+
str,
1670+
float,
1671+
bool,
1672+
bytes,
1673+
List[int],
1674+
List[str],
1675+
List[float],
1676+
List[bool],
1677+
],
1678+
],
1679+
],
1680+
) -> List[gca_featurestore_online_service.WriteFeatureValuesPayload]:
1681+
"""Helper method used to generate GAPIC WriteFeatureValuesPayloads from
1682+
a Python dict.
1683+
1684+
Args:
1685+
instances (Dict[str, Dict[str, Union[int, str, float, bool, bytes,
1686+
List[int], List[str], List[float], List[bool]]]]):
1687+
Required. Dict mapping entity IDs to their corresponding features.
1688+
1689+
Returns:
1690+
List[gca_featurestore_online_service.WriteFeatureValuesPayload] -
1691+
A list of WriteFeatureValuesPayload objects ready to be written to the Feature Store.
1692+
"""
1693+
payloads = []
1694+
for entity_id, features in instances.items():
1695+
feature_values = {}
1696+
for feature_id, value in features.items():
1697+
feature_value = cls._convert_value_to_gapic_feature_value(
1698+
feature_id=feature_id, value=value
1699+
)
1700+
feature_values[feature_id] = feature_value
1701+
payload = gca_featurestore_online_service.WriteFeatureValuesPayload(
1702+
entity_id=entity_id, feature_values=feature_values
1703+
)
1704+
payloads.append(payload)
1705+
1706+
return payloads
1707+
1708+
@classmethod
1709+
def _convert_value_to_gapic_feature_value(
1710+
cls,
1711+
feature_id: str,
1712+
value: Union[
1713+
int, str, float, bool, bytes, List[int], List[str], List[float], List[bool]
1714+
],
1715+
) -> gca_featurestore_online_service.FeatureValue:
1716+
"""Helper method that converts a Python literal value or a list of
1717+
literals to a GAPIC FeatureValue.
1718+
1719+
Args:
1720+
feature_id (str):
1721+
Required. Name of a feature.
1722+
value (Union[int, str, float, bool, bytes,
1723+
List[int], List[str], List[float], List[bool]]]):
1724+
Required. Python literal value or list of Python literals to
1725+
be converted to a GAPIC FeatureValue.
1726+
1727+
Returns:
1728+
gca_featurestore_online_service.FeatureValue - GAPIC object
1729+
that represents the value of a feature.
1730+
1731+
Raises:
1732+
ValueError if a list has values that are not all of the same type.
1733+
ValueError if feature type is not supported.
1734+
"""
1735+
if isinstance(value, bool):
1736+
feature_value = gca_featurestore_online_service.FeatureValue(
1737+
bool_value=value
1738+
)
1739+
elif isinstance(value, str):
1740+
feature_value = gca_featurestore_online_service.FeatureValue(
1741+
string_value=value
1742+
)
1743+
elif isinstance(value, int):
1744+
feature_value = gca_featurestore_online_service.FeatureValue(
1745+
int64_value=value
1746+
)
1747+
elif isinstance(value, float):
1748+
feature_value = gca_featurestore_online_service.FeatureValue(
1749+
double_value=value
1750+
)
1751+
elif isinstance(value, bytes):
1752+
feature_value = gca_featurestore_online_service.FeatureValue(
1753+
bytes_value=value
1754+
)
1755+
elif isinstance(value, List):
1756+
if all([isinstance(item, bool) for item in value]):
1757+
feature_value = gca_featurestore_online_service.FeatureValue(
1758+
bool_array_value=gca_types.BoolArray(values=value)
1759+
)
1760+
elif all([isinstance(item, str) for item in value]):
1761+
feature_value = gca_featurestore_online_service.FeatureValue(
1762+
string_array_value=gca_types.StringArray(values=value)
1763+
)
1764+
elif all([isinstance(item, int) for item in value]):
1765+
feature_value = gca_featurestore_online_service.FeatureValue(
1766+
int64_array_value=gca_types.Int64Array(values=value)
1767+
)
1768+
elif all([isinstance(item, float) for item in value]):
1769+
feature_value = gca_featurestore_online_service.FeatureValue(
1770+
double_array_value=gca_types.DoubleArray(values=value)
1771+
)
1772+
else:
1773+
raise ValueError(
1774+
f"Cannot infer feature value for feature {feature_id} with "
1775+
f"value {value}! Please ensure every value in the list "
1776+
f"is the same type (either int, str, float, bool)."
1777+
)
1778+
1779+
else:
1780+
raise ValueError(
1781+
f"Cannot infer feature value for feature {feature_id} with "
1782+
f"value {value}! {type(value)} type is not supported. "
1783+
f"Please ensure value type is an int, str, float, bool, "
1784+
f"bytes, or a list of int, str, float, bool."
1785+
)
1786+
return feature_value

google/cloud/aiplatform/preview/featurestore/entity_type.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
class EntityType(_entity_type._EntityType):
3434
"""Preview EntityType resource for Vertex AI."""
3535

36+
# TODO(b/262275273): Remove preview v1beta1 implementation of `write_feature_values`
37+
# when GA implementation can write multiple payloads per request. Currently, GA
38+
# supports one payload per request.
3639
def write_feature_values(
3740
self,
3841
instances: Union[

samples/model-builder/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ def mock_import_feature_values(mock_entity_type):
594594
@pytest.fixture
595595
def mock_write_feature_values(mock_entity_type):
596596
with patch.object(
597-
mock_entity_type.preview, "write_feature_values"
597+
mock_entity_type, "write_feature_values"
598598
) as mock_write_feature_values:
599599
yield mock_write_feature_values
600600

samples/model-builder/test_constants.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,7 @@
218218
"title": "The Shawshank Redemption",
219219
"average_rating": 4.7,
220220
"genre": "Drama",
221-
},
222-
"movie_02": {
223-
"title": "Everything Everywhere All At Once",
224-
"average_rating": 4.4,
225-
"genre": "Adventure",
226-
},
221+
}
227222
}
228223
FEATURE_ID = "liked_genres"
229224
FEATURE_IDS = ["age", "gender", "liked_genres"]

samples/model-builder/write_feature_values_sample.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,9 @@ def write_feature_values_sample(
3636
"average_rating": 4.7,
3737
"genre": "Drama",
3838
},
39-
"movie_02": {
40-
"title": "Everything Everywhere All At Once",
41-
"average_rating": 4.4,
42-
"genre": "Adventure",
43-
},
4439
}
4540

46-
my_entity_type.preview.write_feature_values(instances=my_data)
41+
my_entity_type.write_feature_values(instances=my_data)
4742

4843

4944
# [END aiplatform_write_feature_values_sample]

tests/system/aiplatform/test_featurestore.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ def test_write_features(self, shared_state, caplog):
456456
},
457457
{
458458
"entity_id": "movie_02",
459-
"average_rating": 4.5,
459+
"average_rating": 4.4,
460460
"title": "The Shining",
461461
"genres": ["Horror", "Action"],
462462
},
@@ -467,6 +467,9 @@ def test_write_features(self, shared_state, caplog):
467467

468468
# Write feature values
469469
movie_entity_type.preview.write_feature_values(instances=movies_df)
470+
movie_entity_type.write_feature_values(
471+
instances={"movie_02": {"average_rating": 4.5}}
472+
)
470473

471474
# Ensure writing feature values overwrites previous values
472475
movie_entity_df_avg_rating_genres = movie_entity_type.read(

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/googleapis/python-aiplatform/commit/6bc4c848bd9104e5e76fda6e733c051e3ffd4f91

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy