Skip to content

Commit b2a946e

Browse files
committed
Added to_chunked_dataframes generator method in the Job class
1 parent 737eecd commit b2a946e

1 file changed

Lines changed: 43 additions & 6 deletions

File tree

sdk/python/feast/job.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import tempfile
22
import time
33
from datetime import datetime, timedelta
4-
from typing import List
4+
from typing import Iterable
55
from urllib.parse import urlparse
66

77
import fastavro
88
import pandas as pd
9-
from fastavro import reader as fastavro_reader
109
from google.cloud import storage
1110

1211
from feast.serving.ServingService_pb2 import GetJobRequest
@@ -96,6 +95,7 @@ def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC):
9695
)
9796

9897
uris = [urlparse(uri) for uri in self.job_proto.file_uris]
98+
print(uris)
9999
for file_uri in uris:
100100
if file_uri.scheme == "gs":
101101
file_obj = tempfile.TemporaryFile()
@@ -113,16 +113,53 @@ def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC):
113113
for record in avro_reader:
114114
yield record
115115

116-
def to_dataframe(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC):
116+
def to_dataframe(
117+
self, timeout_sec: int = DEFAULT_TIMEOUT_SEC
118+
) -> pd.DataFrame:
117119
"""
118-
Wait until job is done to get an interable rows of result
120+
Wait until job is done to get an iterable rows of result.
119121
120122
Args:
121-
timeout_sec: max no of seconds to wait until job is done. If "timeout_sec" is exceeded, an exception will be raised.
122-
Returns: pandas Dataframe of the feature values
123+
timeout_sec (int):
124+
Max no of seconds to wait until job is done. If "timeout_sec"
125+
is exceeded, an exception will be raised.
126+
Returns:
127+
pd.DataFrame:
128+
Pandas DataFrame of the feature values.
123129
"""
124130
records = [r for r in self.result(timeout_sec=timeout_sec)]
125131
return pd.DataFrame.from_records(records)
126132

133+
def to_chunked_dataframes(
134+
self,
135+
max_chunk_size: int,
136+
timeout_sec: int = DEFAULT_TIMEOUT_SEC
137+
) -> Iterable[pd.DataFrame]:
138+
"""
139+
Wait until a job is done to get an iterable rows of result. This method
140+
will split the response into chunked DataFrame of a specified size to
141+
to be yielded to the instance calling it.
142+
143+
Args:
144+
max_chunk_size (int):
145+
Maximum number of rows that the DataFrame should contain.
146+
147+
timeout_sec (int):
148+
Max no of seconds to wait until job is done. If "timeout_sec"
149+
is exceeded, an exception will be raised.
150+
151+
Returns:
152+
pd.DataFrame:
153+
Pandas DataFrame of the feature values.
154+
"""
155+
156+
records = []
157+
for result in self.result(timeout_sec=timeout_sec):
158+
result.append(records)
159+
if len(records) == max_chunk_size:
160+
df = pd.DataFrame.from_records(records)
161+
records.clear() # Empty records array
162+
yield df
163+
127164
def __iter__(self):
128165
return iter(self.result())

0 commit comments

Comments
 (0)