-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_historical_retrieval.py
More file actions
236 lines (201 loc) Β· 10 KB
/
test_historical_retrieval.py
File metadata and controls
236 lines (201 loc) Β· 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
Minimal example to test historical feature retrieval with Feast
Used for testing Dask/Spark offline store changes
"""
import pandas as pd
from datetime import timedelta
from feast import FeatureStore
import logging
# ============================================================================
# CONFIGURATION: Choose which feature repository to use
# ============================================================================
# Options:
# - "./feature_repo" : Dask offline store
# - "./feature_repo_spark" : Spark offline store
# - "./feature_repo_remote" : Remote Feast server
FEATURE_REPO_PATH = "./feature_repo_spark" # Change this to switch between Dask/Spark
# ============================================================================
# SECTION 1: Feast Object Creation
# ============================================================================
# Uncomment/comment this section as needed during development
# Set up logging
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger("feast").setLevel(logging.INFO)
logger = logging.getLogger("feast")
# Initialize feature store
fs = FeatureStore(repo_path=FEATURE_REPO_PATH)
print("=" * 70)
print("SECTION 1: Feast Object Creation")
print("=" * 70)
print("β
Feature store initialized")
print(f"π Using offline store type: {fs.config.offline_store.type}")
print(f"π Registry: {fs.config.registry}")
print(f"πΎ Online store: {fs.config.online_store.type if hasattr(fs.config.online_store, 'type') else 'N/A'}")
# ============================================================================
# SECTION 2: Test Feast Connection & Feature Views
# ============================================================================
# Uncomment/comment this section to test if Feast is connected correctly
print("\n" + "=" * 70)
print("SECTION 2: Test Feast Connection & Feature Views")
print("=" * 70)
try:
fvs = fs.list_feature_views()
print(f"β
Feature views found: {len(fvs)}")
for fv in fvs:
print(f" - {fv.name}")
# Handle entities - they might be Entity objects or strings
entity_names = []
for e in fv.entities:
if hasattr(e, 'name'):
entity_names.append(e.name)
else:
entity_names.append(str(e))
print(f" Entities: {entity_names}")
print(f" Source: {type(fv.batch_source).__name__}")
except Exception as e:
print(f"β Error listing feature views: {e}")
import traceback
traceback.print_exc()
# ============================================================================
# SECTION 3: Test Historical Feature Retrieval Using Entity DataFrame
# ============================================================================
# Uncomment/comment this section to test entity-based historical retrieval
print("\n" + "=" * 70)
print("SECTION 3: Test Historical Feature Retrieval Using Entity DataFrame")
print("=" * 70)
# Read Parquet file to get sample data
parquet_path = f"./feature_repo_spark/data/user_stats.parquet"
try:
data_df = pd.read_parquet(parquet_path)
print(f"π Data in Parquet file: {len(data_df)} records")
print(f"π
Timestamp range: {data_df['created_timestamp'].min()} to {data_df['created_timestamp'].max()}")
print(f"π₯ User IDs range: {data_df['user_id'].min()} to {data_df['user_id'].max()}")
# Create entity dataframe for point-in-time join
# Use first 3 user_ids with timestamps after their created_timestamp
sample_data = data_df[['user_id', 'created_timestamp']].head(3)
entity_df = pd.DataFrame({
"user_id": sample_data['user_id'].tolist(),
"event_timestamp": [
ts + timedelta(hours=1) # 1 hour after each user's created_timestamp
for ts in sample_data['created_timestamp']
]
})
print(f"\nπ Entity data for feature retrieval:")
print(entity_df)
# Retrieve historical features using entity_df
print(f"\nπ Retrieving features for {len(entity_df)} entities...")
historical_features = fs.get_historical_features(
entity_df=entity_df,
features=[
"user_stats:avg_transaction_amount",
"user_stats:total_transactions",
],
)
print(f"\nβ
Historical features retrieved:")
df = historical_features.to_df()
print(f"π Result shape: {df.shape}")
if len(df) > 0:
print(f"\nπ Results:")
print(df)
else:
print("β οΈ Empty result - no features found")
except FileNotFoundError:
print(f"β οΈ Parquet file not found at {parquet_path}")
print(" Run setup_sample_data_dask.py first to create sample data")
except Exception as e:
print(f"β Error in entity-based retrieval: {e}")
import traceback
traceback.print_exc()
# ============================================================================
# SECTION 4: Test Historical Feature Retrieval Using start_date & end_date
# ============================================================================
# Uncomment/comment this section to test date range-based historical retrieval
# This is the main test for Dask/Spark offline store changes
#
# NOTE: If you see "unexpected keyword argument 'start_date'" error for Spark,
# this is expected - you're implementing this feature in SparkOfflineStore
print("\n" + "=" * 70)
print("SECTION 4: Test Historical Feature Retrieval Using start_date & end_date")
print("=" * 70)
try:
# Read Parquet file to determine appropriate date range
parquet_path = f"./feature_repo_spark/data/user_stats.parquet"
data_df = pd.read_parquet(parquet_path)
# Set date range based on actual data
start_date = data_df['created_timestamp'].min() + timedelta(days=10)
end_date = data_df['created_timestamp'].max() - timedelta(days=0)
print(f"π Data in Parquet file: {len(data_df)} records")
print(f"π
Data timestamp range: {data_df['created_timestamp'].min()} to {data_df['created_timestamp'].max()}")
print(f"π
Query date range: {start_date} to {end_date}")
# Retrieve historical features using start_date and end_date (no entity_df)
print(f"\nπ Retrieving features for date range (no entity_df)...")
historical_features = fs.get_historical_features(
features=[
"user_stats:avg_transaction_amount",
"user_stats:total_transactions",
],
start_date=start_date,
end_date=end_date,
)
print(f"\nβ
Historical features retrieved:")
df = historical_features.to_df()
print(f"π Result shape: {df.shape}")
if len(df) > 0:
# Join with source data to show created_timestamp vs event_timestamp
# This helps understand the difference between when features were created vs when they're queried
df_with_source = None
if 'user_id' in df.columns:
# Convert user_id to same type for merging
df['user_id'] = df['user_id'].astype(int)
# Merge with source data to show created_timestamp
df_with_source = df.merge(
data_df[['user_id', 'created_timestamp']],
on='user_id',
how='left',
suffixes=('', '_source')
)
# Rename for clarity
if 'created_timestamp_source' in df_with_source.columns:
df_with_source = df_with_source.rename(columns={'created_timestamp_source': 'created_timestamp_in_source'})
print(f"\nπ First 10 rows (showing both event_timestamp and created_timestamp):")
# Show key columns - handle both event_timestamp and entity_ts column names
timestamp_col = 'event_timestamp' if 'event_timestamp' in df_with_source.columns else 'entity_ts'
display_cols = ['user_id', timestamp_col]
if 'created_timestamp_in_source' in df_with_source.columns:
display_cols.append('created_timestamp_in_source')
display_cols.extend([col for col in df_with_source.columns if col not in display_cols and col != 'created_timestamp'])
print(df_with_source[display_cols].head(30))
print(f"\nπ‘ Explanation:")
print(f" - event_timestamp: Point-in-time when features are queried (query timestamp)")
print(f" - created_timestamp_in_source: When the feature was actually created/updated in source data")
print(f" - When using start_date/end_date without entity_df, Feast uses end_date as event_timestamp")
print(f" - This is correct: 'What were the features as of {end_date}?'")
else:
print(f"\nπ First 30 rows:")
print(df.head(30))
print(f"\nπ Summary:")
print(f" Total rows: {len(df)}")
if 'user_id' in df.columns:
print(f" Unique user_ids: {df['user_id'].nunique()}")
if 'event_timestamp' in df.columns:
print(f" Event timestamp range (query time): {df['event_timestamp'].min()} to {df['event_timestamp'].max()}")
if df_with_source is not None and len(df_with_source) > 0:
if 'created_timestamp_in_source' in df_with_source.columns:
print(f" Created timestamp range (source data): {df_with_source['created_timestamp_in_source'].min()} to {df_with_source['created_timestamp_in_source'].max()}")
else:
print("β οΈ Empty result - no features found in the specified date range")
print(f" Data spans: {data_df['created_timestamp'].min()} to {data_df['created_timestamp'].max()}")
print(f" Requested range: {start_date} to {end_date}")
except FileNotFoundError:
print(f"β οΈ Parquet file not found at {parquet_path}")
print(" Run setup_sample_data_dask.py first to create sample data")
except Exception as e:
print(f"β Error in date range-based retrieval: {e}")
import traceback
traceback.print_exc()
# ============================================================================
# Summary
# ============================================================================
print("\n" + "=" * 70)
print("β
Test completed!")
print("=" * 70)