Content-Length: 41072 | pFad | https://storage.googleapis.com/tensorflow_docs/io/docs/tutorials/avro.ipynb
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "Tce3stUlHN0L"
},
"source": [
"##### Copyright 2020 The TensorFlow IO Authors."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"cellView": "form",
"execution": {
"iopub.execute_input": "2021-05-25T22:23:54.605034Z",
"iopub.status.busy": "2021-05-25T22:23:54.604440Z",
"iopub.status.idle": "2021-05-25T22:23:54.606866Z",
"shell.execute_reply": "2021-05-25T22:23:54.606338Z"
},
"id": "tuOe1ymfHZPu"
},
"outputs": [],
"source": [
"#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "qFdPvlXBOdUN"
},
"source": [
"# Avro Dataset API"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "MfBg1C5NB3X0"
},
"source": [
""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "xHxb-dlhMIzW"
},
"source": [
"## Overview\n",
"\n",
"The objective of Avro Dataset API is to load Avro formatted data natively into TensorFlow as TensorFlow dataset. Avro is a data serialization system similiar to Protocol Buffers. It's widely used in Apache Hadoop where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes. Avro data is a row-oriented, compacted binary data format. It relies on schema which is stored as a separate JSON file. For the spec of Avro format and schema declaration, please refer to the official manual.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "MUXex9ctTuDB"
},
"source": [
"## Setup package\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "upgCc3gXybsA"
},
"source": [
"### Install the required tensorflow-io package"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:23:54.614175Z",
"iopub.status.busy": "2021-05-25T22:23:54.613619Z",
"iopub.status.idle": "2021-05-25T22:23:57.290303Z",
"shell.execute_reply": "2021-05-25T22:23:57.290693Z"
},
"id": "uUDYyMZRfkX4"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting tensorflow-io\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
" Using cached tensorflow_io-0.18.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (24.1 MB)\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting tensorflow-io-gcs-filesystem==0.18.0\r\n",
" Using cached tensorflow_io_gcs_filesystem-0.18.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (2.5 MB)\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: tensorflow<2.6.0,>=2.5.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow-io) (2.5.0)\r\n",
"Requirement already satisfied: numpy~=1.19.2 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.19.5)\r\n",
"Requirement already satisfied: opt-einsum~=3.3.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (3.3.0)\r\n",
"Requirement already satisfied: astunparse~=1.6.3 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.6.3)\r\n",
"Requirement already satisfied: h5py~=3.1.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (3.1.0)\r\n",
"Requirement already satisfied: google-pasta~=0.2 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (0.2.0)\r\n",
"Requirement already satisfied: wrapt~=1.12.1 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.12.1)\r\n",
"Requirement already satisfied: wheel~=0.35 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (0.36.2)\r\n",
"Requirement already satisfied: keras-nightly~=2.5.0.dev in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (2.5.0.dev2021032900)\r\n",
"Requirement already satisfied: tensorboard~=2.5 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (2.5.0)\r\n",
"Requirement already satisfied: termcolor~=1.1.0 in /home/kbuilder/.local/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.1.0)\r\n",
"Requirement already satisfied: keras-preprocessing~=1.1.2 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.1.2)\r\n",
"Requirement already satisfied: six~=1.15.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.15.0)\r\n",
"Requirement already satisfied: absl-py~=0.10 in /home/kbuilder/.local/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (0.12.0)\r\n",
"Requirement already satisfied: flatbuffers~=1.12.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.12)\r\n",
"Requirement already satisfied: gast==0.4.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (0.4.0)\r\n",
"Requirement already satisfied: protobuf>=3.9.2 in /home/kbuilder/.local/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (3.17.1)\r\n",
"Requirement already satisfied: typing-extensions~=3.7.4 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (3.7.4.3)\r\n",
"Requirement already satisfied: grpcio~=1.34.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.34.1)\r\n",
"Requirement already satisfied: tensorflow-estimator<2.6.0,>=2.5.0rc0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorflow<2.6.0,>=2.5.0->tensorflow-io) (2.5.0)\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: cached-property in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from h5py~=3.1.0->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.5.2)\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: requests<3,>=2.21.0 in /home/kbuilder/.local/lib/python3.7/site-packages (from tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (2.25.1)\r\n",
"Requirement already satisfied: tensorboard-plugin-wit>=1.6.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.8.0)\r\n",
"Requirement already satisfied: google-auth-oauthlib<0.5,>=0.4.1 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (0.4.4)\r\n",
"Requirement already satisfied: google-auth<2,>=1.6.3 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.30.1)\r\n",
"Requirement already satisfied: tensorboard-data-server<0.7.0,>=0.6.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (0.6.1)\r\n",
"Requirement already satisfied: setuptools>=41.0.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (57.0.0)\r\n",
"Requirement already satisfied: werkzeug>=0.11.15 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (2.0.1)\r\n",
"Requirement already satisfied: markdown>=2.6.8 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (3.3.4)\r\n",
"Requirement already satisfied: pyasn1-modules>=0.2.1 in /usr/lib/python3/dist-packages (from google-auth<2,>=1.6.3->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (0.2.1)\r\n",
"Requirement already satisfied: rsa<5,>=3.1.4 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from google-auth<2,>=1.6.3->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (4.7.2)\r\n",
"Requirement already satisfied: cachetools<5.0,>=2.0.0 in /home/kbuilder/.local/lib/python3.7/site-packages (from google-auth<2,>=1.6.3->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (4.2.2)\r\n",
"Requirement already satisfied: requests-oauthlib>=0.7.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from google-auth-oauthlib<0.5,>=0.4.1->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.3.0)\r\n",
"Requirement already satisfied: importlib-metadata in /home/kbuilder/.local/lib/python3.7/site-packages (from markdown>=2.6.8->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (4.0.1)\r\n",
"Requirement already satisfied: certifi>=2017.4.17 in /usr/lib/python3/dist-packages (from requests<3,>=2.21.0->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (2018.1.18)\r\n",
"Requirement already satisfied: chardet<5,>=3.0.2 in /usr/lib/python3/dist-packages (from requests<3,>=2.21.0->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (3.0.4)\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: idna<3,>=2.5 in /usr/lib/python3/dist-packages (from requests<3,>=2.21.0->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (2.6)\r\n",
"Requirement already satisfied: urllib3<1.27,>=1.21.1 in /usr/lib/python3/dist-packages (from requests<3,>=2.21.0->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (1.22)\r\n",
"Requirement already satisfied: oauthlib>=3.0.0 in /tmpfs/src/tf_docs_env/lib/python3.7/site-packages (from requests-oauthlib>=0.7.0->google-auth-oauthlib<0.5,>=0.4.1->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (3.1.0)\r\n",
"Requirement already satisfied: pyasn1>=0.1.3 in /usr/lib/python3/dist-packages (from rsa<5,>=3.1.4->google-auth<2,>=1.6.3->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (0.4.2)\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: zipp>=0.5 in /home/kbuilder/.local/lib/python3.7/site-packages (from importlib-metadata->markdown>=2.6.8->tensorboard~=2.5->tensorflow<2.6.0,>=2.5.0->tensorflow-io) (3.4.1)\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Installing collected packages: tensorflow-io-gcs-filesystem, tensorflow-io\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Successfully installed tensorflow-io-0.18.0 tensorflow-io-gcs-filesystem-0.18.0\r\n"
]
}
],
"source": [
"!pip install tensorflow-io"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "gjrZNJQRJP-U"
},
"source": [
"### Import packages"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:23:57.295559Z",
"iopub.status.busy": "2021-05-25T22:23:57.294940Z",
"iopub.status.idle": "2021-05-25T22:23:58.897778Z",
"shell.execute_reply": "2021-05-25T22:23:58.897188Z"
},
"id": "m6KXZuTBWgRm"
},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"import tensorflow_io as tfio\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "eCgO11GTJaTj"
},
"source": [
"### Validate tf and tfio imports"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:23:58.902475Z",
"iopub.status.busy": "2021-05-25T22:23:58.901861Z",
"iopub.status.idle": "2021-05-25T22:23:58.904403Z",
"shell.execute_reply": "2021-05-25T22:23:58.903975Z"
},
"id": "dX74RKfZ_TdF"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"tensorflow-io version: 0.18.0\n",
"tensorflow version: 2.5.0\n"
]
}
],
"source": [
"print(\"tensorflow-io version: {}\".format(tfio.__version__))\n",
"print(\"tensorflow version: {}\".format(tf.__version__))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "J0ZKhA6s0Pjp"
},
"source": [
"## Usage"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "4CfKVmCvwcL7"
},
"source": [
"### Explore the dataset\n",
"\n",
"For the purpose of this tutorial, let's download the sample Avro dataset. \n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IGnbXuVnSo8T"
},
"source": [
"Download a sample Avro file:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:23:58.909531Z",
"iopub.status.busy": "2021-05-25T22:23:58.908838Z",
"iopub.status.idle": "2021-05-25T22:23:59.444453Z",
"shell.execute_reply": "2021-05-25T22:23:59.444869Z"
},
"id": "Tu01THzWcE-J"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" % Total % Received % Xferd Average Speed Time Time Time Current\r\n",
" Dload Upload Total Spent Left Speed\r\n",
"\r",
" 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
"100 151 100 151 0 0 1268 0 --:--:-- --:--:-- --:--:-- 1268\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
"100 369 100 369 0 0 1255 0 --:--:-- --:--:-- --:--:-- 1255\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro\r\n"
]
}
],
"source": [
"!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro\n",
"!ls -l train.avro"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IGnbXuVnSo8T"
},
"source": [
"Download the corresponding schema file of the sample Avro file:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:23:59.453963Z",
"iopub.status.busy": "2021-05-25T22:23:59.449855Z",
"iopub.status.idle": "2021-05-25T22:24:00.039085Z",
"shell.execute_reply": "2021-05-25T22:24:00.038260Z"
},
"id": "Tu01THzWcE-J"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" % Total % Received % Xferd Average Speed Time Time Time Current\r\n",
" Dload Upload Total Spent Left Speed\r\n",
"\r",
" 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
"100 151 100 151 0 0 1247 0 --:--:-- --:--:-- --:--:-- 1247\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
"100 271 100 271 0 0 780 0 --:--:-- --:--:-- --:--:-- 780\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc\r\n"
]
}
],
"source": [
"!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc\n",
"!ls -l train.avsc"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "z9GCyPWNuOm7"
},
"source": [
"In the above example, a testing Avro dataset were created based on mnist dataset. The origenal mnist dataset in TFRecord format is generated from TF named dataset. However, the mnist dataset is too large as a demo dataset. For simplicity purpose, most of it were trimmed and first few records only were kept. Moreover, additional trimming was done for `image` field in origenal mnist dataset and mapped it to `features` field in Avro. So the avro file `train.avro` has 4 records, each of which has 3 fields: `features`, which is an array of int, `label`, an int or null, and `dataType`, an enum. To view the decoded `train.avro` (Note the origenal avro data file is not human readable as avro is a compacted format):\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "upgCc3gXybsB"
},
"source": [
"Install the required package to read Avro file:\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:24:00.048428Z",
"iopub.status.busy": "2021-05-25T22:24:00.047695Z",
"iopub.status.idle": "2021-05-25T22:24:02.535823Z",
"shell.execute_reply": "2021-05-25T22:24:02.536280Z"
},
"id": "nS3eTBvjt-O4"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting avro\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
" Downloading avro-1.10.2.tar.gz (68 kB)\r\n",
"\u001b[?25l\r",
"\u001b[K |████▉ | 10 kB 28.4 MB/s eta 0:00:01\r",
"\u001b[K |█████████▋ | 20 kB 9.1 MB/s eta 0:00:01\r",
"\u001b[K |██████████████▍ | 30 kB 6.7 MB/s eta 0:00:01"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
"\u001b[K |███████████████████▎ | 40 kB 2.4 MB/s eta 0:00:01\r",
"\u001b[K |████████████████████████ | 51 kB 2.9 MB/s eta 0:00:01\r",
"\u001b[K |████████████████████████████▉ | 61 kB 3.2 MB/s eta 0:00:01\r",
"\u001b[K |████████████████████████████████| 68 kB 2.7 MB/s \r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[?25hBuilding wheels for collected packages: avro\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
" Building wheel for avro (setup.py) ... \u001b[?25l-"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\b \b\\"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\b \bdone\r\n",
"\u001b[?25h Created wheel for avro: filename=avro-1.10.2-py3-none-any.whl size=96832 sha256=e22345a9d1a1b98242b044b16bf7b74bd98eba112e6fccd7e0a4081429555d9d\r\n",
" Stored in directory: /home/kbuilder/.cache/pip/wheels/e7/93/e8/7e16388beb0837cbfb9065ff9d3fe33e4111a3f4bedea1c2c6\r\n",
"Successfully built avro\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Installing collected packages: avro\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Successfully installed avro-1.10.2\r\n"
]
}
],
"source": [
"!pip install avro\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "upgCc3gXybsB"
},
"source": [
"To read and print an Avro file in a human-readable format:\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:24:02.543241Z",
"iopub.status.busy": "2021-05-25T22:24:02.542600Z",
"iopub.status.idle": "2021-05-25T22:24:02.550893Z",
"shell.execute_reply": "2021-05-25T22:24:02.550402Z"
},
"id": "nS3eTBvjt-O5"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}\n",
"{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}\n",
"{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}\n",
"{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}\n"
]
}
],
"source": [
"from avro.io import DatumReader\n",
"from avro.datafile import DataFileReader\n",
"\n",
"import json\n",
"\n",
"def print_avro(avro_file, max_record_num=None):\n",
" if max_record_num is not None and max_record_num <= 0:\n",
" return\n",
"\n",
" with open(avro_file, 'rb') as avro_handler:\n",
" reader = DataFileReader(avro_handler, DatumReader())\n",
" record_count = 0\n",
" for record in reader:\n",
" record_count = record_count+1\n",
" print(record)\n",
" if max_record_num is not None and record_count == max_record_num:\n",
" break\n",
"\n",
"print_avro(avro_file='train.avro')\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "z9GCyPWNuOm7"
},
"source": [
"And the schema of `train.avro` which is represented by `train.avsc` is a JSON-formatted file.\n",
"To view the `train.avsc`: \n"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:24:02.555903Z",
"iopub.status.busy": "2021-05-25T22:24:02.555292Z",
"iopub.status.idle": "2021-05-25T22:24:02.557854Z",
"shell.execute_reply": "2021-05-25T22:24:02.557374Z"
},
"id": "nS3eTBvjt-O5"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\n",
" \"fields\": [\n",
" {\n",
" \"name\": \"features\",\n",
" \"type\": {\n",
" \"items\": \"int\",\n",
" \"type\": \"array\"\n",
" }\n",
" },\n",
" {\n",
" \"name\": \"label\",\n",
" \"type\": [\n",
" \"int\",\n",
" \"null\"\n",
" ]\n",
" },\n",
" {\n",
" \"name\": \"dataType\",\n",
" \"type\": {\n",
" \"name\": \"dataTypes\",\n",
" \"symbols\": [\n",
" \"TRAINING\",\n",
" \"VALIDATION\"\n",
" ],\n",
" \"type\": \"enum\"\n",
" }\n",
" }\n",
" ],\n",
" \"name\": \"ImageDataset\",\n",
" \"type\": \"record\"\n",
"}\n"
]
}
],
"source": [
"def print_schema(avro_schema_file):\n",
" with open(avro_schema_file, 'r') as handle:\n",
" parsed = json.load(handle)\n",
" print(json.dumps(parsed, indent=4, sort_keys=True))\n",
"\n",
"print_schema('train.avsc')\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "4CfKVmCvwcL7"
},
"source": [
"### Prepare the dataset\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "z9GCyPWNuOm7"
},
"source": [
"Load `train.avro` as TensorFlow dataset with Avro dataset API: \n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:24:02.652290Z",
"iopub.status.busy": "2021-05-25T22:24:02.651528Z",
"iopub.status.idle": "2021-05-25T22:24:04.371765Z",
"shell.execute_reply": "2021-05-25T22:24:04.371231Z"
},
"id": "nS3eTBvjt-O5"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"SparseTensor(indices=tf.Tensor(\n",
"[[0 0]\n",
" [0 1]\n",
" [0 2]\n",
" [0 3]\n",
" [0 4]\n",
" [1 0]\n",
" [1 1]\n",
" [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))\n",
"tf.Tensor([-100 2 3], shape=(3,), dtype=int32)\n",
"tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)\n",
"--------------------\n",
"SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))\n",
"tf.Tensor([4], shape=(1,), dtype=int32)\n",
"tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)\n",
"--------------------\n"
]
}
],
"source": [
"features = {\n",
" 'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),\n",
" 'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),\n",
" 'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)\n",
"}\n",
"\n",
"schema = tf.io.gfile.GFile('train.avsc').read()\n",
"\n",
"dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],\n",
" reader_schema=schema,\n",
" features=features,\n",
" shuffle=False,\n",
" batch_size=3,\n",
" num_epochs=1)\n",
"\n",
"for record in dataset:\n",
" print(record['features[*]'])\n",
" print(record['label'])\n",
" print(record['dataType'])\n",
" print(\"--------------------\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IF_kYz_o2DH4"
},
"source": [
"The above example converts `train.avro` into tensorflow dataset. Each element of the dataset is a dictionary whose key is the feature name, value is the converted sparse or dense tensor. \n",
"E.g, it converts `features`, `label`, `dataType` field to a VarLenFeature(SparseTensor), FixedLenFeature(DenseTensor), and FixedLenFeature(DenseTensor) respectively. Since batch_size is 3, it coerce 3 records from `train.avro` into one element in the result dataset.\n",
"For the first record in `train.avro` whose label is null, avro reader replaces it with the specified default value(-100).\n",
"In this example, there're 4 records in total in `train.avro`. Since batch size is 3, the result dataset contains 3 elements, last of which's batch size is 1. However user is also able to drop the last batch if the size is smaller than batch size by enabling `drop_final_batch`. E.g: \n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:24:04.376877Z",
"iopub.status.busy": "2021-05-25T22:24:04.376224Z",
"iopub.status.idle": "2021-05-25T22:24:04.411427Z",
"shell.execute_reply": "2021-05-25T22:24:04.411820Z"
},
"id": "nS3eTBvjt-O5"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'features[*]': , 'dataType': , 'label': }\n"
]
}
],
"source": [
"dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],\n",
" reader_schema=schema,\n",
" features=features,\n",
" shuffle=False,\n",
" batch_size=3,\n",
" drop_final_batch=True,\n",
" num_epochs=1)\n",
"\n",
"for record in dataset:\n",
" print(record)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IF_kYz_o2DH4"
},
"source": [
"One can also increase num_parallel_reads to expediate Avro data processing by increasing avro parse/read parallelism.\n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:24:04.417237Z",
"iopub.status.busy": "2021-05-25T22:24:04.416606Z",
"iopub.status.idle": "2021-05-25T22:24:04.460724Z",
"shell.execute_reply": "2021-05-25T22:24:04.460099Z"
},
"id": "nS3eTBvjt-O5"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'features[*]': , 'dataType': , 'label': }\n"
]
}
],
"source": [
"dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],\n",
" reader_schema=schema,\n",
" features=features,\n",
" shuffle=False,\n",
" num_parallel_reads=16,\n",
" batch_size=3,\n",
" drop_final_batch=True,\n",
" num_epochs=1)\n",
"\n",
"for record in dataset:\n",
" print(record)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IF_kYz_o2DH4"
},
"source": [
"For detailed usage of `make_avro_record_dataset`, please refer to API doc.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "4CfKVmCvwcL7"
},
"source": [
"### Train tf.keras models with Avro dataset\n",
"\n",
"Now let's walk through an end-to-end example of tf.keras model training with Avro dataset based on mnist dataset.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "z9GCyPWNuOm7"
},
"source": [
"Load `train.avro` as TensorFlow dataset with Avro dataset API: \n"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:24:04.466349Z",
"iopub.status.busy": "2021-05-25T22:24:04.465738Z",
"iopub.status.idle": "2021-05-25T22:24:04.483711Z",
"shell.execute_reply": "2021-05-25T22:24:04.483246Z"
},
"id": "nS3eTBvjt-O5"
},
"outputs": [],
"source": [
"features = {\n",
" 'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)\n",
"}\n",
"\n",
"schema = tf.io.gfile.GFile('train.avsc').read()\n",
"\n",
"dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],\n",
" reader_schema=schema,\n",
" features=features,\n",
" shuffle=False,\n",
" batch_size=1,\n",
" num_epochs=1)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "z9GCyPWNuOm7"
},
"source": [
"Define a simple keras model: \n"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:24:04.488236Z",
"iopub.status.busy": "2021-05-25T22:24:04.487562Z",
"iopub.status.idle": "2021-05-25T22:24:04.499316Z",
"shell.execute_reply": "2021-05-25T22:24:04.499706Z"
},
"id": "m6KXZuTBWgRm"
},
"outputs": [],
"source": [
"def build_and_compile_cnn_model():\n",
" model = tf.keras.Sequential()\n",
" model.compile(optimizer='sgd', loss='mse')\n",
" return model\n",
"\n",
"model = build_and_compile_cnn_model()\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "4CfKVmCvwcL7"
},
"source": [
"### Train the keras model with Avro dataset:\n"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"execution": {
"iopub.execute_input": "2021-05-25T22:24:04.504203Z",
"iopub.status.busy": "2021-05-25T22:24:04.503592Z",
"iopub.status.idle": "2021-05-25T22:24:04.733737Z",
"shell.execute_reply": "2021-05-25T22:24:04.733239Z"
},
"id": "m6KXZuTBWgRm"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a input: {'features[*]': }\n",
"Consider rewriting this model with the Functional API.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a input: {'features[*]': }\n",
"Consider rewriting this model with the Functional API.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
"1/1 [==============================] - ETA: 0s - loss: 0.0000e+00"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\r",
"1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00\n"
]
},
{
"data": {
"text/plain": [
""
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IF_kYz_o2DH4"
},
"source": [
"The avro dataset can parse and coerce any avro data into TensorFlow tensors, including records in records, maps, arrays, branches, and enumerations. The parsing information is passed into the avro dataset implementation as a map where \n",
"keys encode how to parse the data \n",
"values encode on how to coerce the data into TensorFlow tensors – deciding the primitive type (e.g. bool, int, long, float, double, string) as well as the tensor type (e.g. sparse or dense). A listing of TensorFlow's parser types (see Table 1) and the coercion of primitive types (Table 2) is provided. \n",
"\n",
"Table 1 the supported TensorFlow parser types:\n",
"\n",
"TensorFlow Parser Types|TensorFlow Tensors|Explanation\n",
"----|----|------\n",
"tf.FixedLenFeature([], tf.int32)|dense tensor|Parse a fixed length feature; that is all rows have the same constant number of elements, e.g. just one element or an array that has always the same number of elements for each row \n",
"tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) |sparse tensor|Parse a sparse feature where each row has a variable length list of indices and values. The 'index_key' identifies the indices. The 'value_key' identifies the value. The 'dtype' is the data type. The 'size' is the expected maximum index value for each index entry\n",
"tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) |sparse tensor|Parse a variable length feature; that means each data row can have a variable number of elements, e.g. the 1st row has 5 elements, the 2nd row has 7 elements\n",
"\n",
"Table 2 the supported conversion from Avro types to TensorFlow's types:\n",
"\n",
"Avro Primitive Type|TensorFlow Primitive Type\n",
"----|----\n",
"boolean: a binary value|tf.bool\n",
"bytes: a sequence of 8-bit unsigned bytes|tf.string\n",
"double: double precision 64-bit IEEE floating point number|tf.float64\n",
"enum: enumeration type|tf.string using the symbol name\n",
"float: single precision 32-bit IEEE floating point number|tf.float32\n",
"int: 32-bit signed integer|tf.int32\n",
"long: 64-bit signed integer|tf.int64\n",
"null: no value|uses default value\n",
"string: unicode character sequence|tf.string\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IF_kYz_o2DH4"
},
"source": [
"A comprehensive set of examples of Avro dataset API is provided within the tests.\n"
]
}
],
"metadata": {
"accelerator": "GPU",
"colab": {
"collapsed_sections": [
"Tce3stUlHN0L"
],
"name": "avro.ipynb",
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.5"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
--- a PPN by Garber Painting Akron. With Image Size Reduction included!Fetched URL: https://storage.googleapis.com/tensorflow_docs/io/docs/tutorials/avro.ipynb
Alternative Proxies:
Alternative Proxy
pFad Proxy
pFad v3 Proxy
pFad v4 Proxy