Skip to content

Commit 5e4b9fd

Browse files
feat: Support hdfs:// uris in to_remote_storage for Spark offline store (#5635)
1 parent 8715ae8 commit 5e4b9fd

File tree

1 file changed

+26
-2
lines changed
  • sdk/python/feast/infra/offline_stores/contrib/spark_offline_store

1 file changed

+26
-2
lines changed

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,10 +497,18 @@ def to_remote_storage(self) -> List[str]:
497497
return aws_utils.list_s3_files(
498498
self._config.offline_store.region, output_uri
499499
)
500-
500+
elif self._config.offline_store.staging_location.startswith("hdfs://"):
501+
output_uri = os.path.join(
502+
self._config.offline_store.staging_location, str(uuid.uuid4())
503+
)
504+
sdf.write.parquet(output_uri)
505+
spark_session = get_spark_session_or_start_new_with_repoconfig(
506+
store_config=self._config.offline_store
507+
)
508+
return _list_hdfs_files(spark_session, output_uri)
501509
else:
502510
raise NotImplementedError(
503-
"to_remote_storage is only implemented for file:// and s3:// uri schemes"
511+
"to_remote_storage is only implemented for file://, s3:// and hdfs:// uri schemes"
504512
)
505513

506514
else:
@@ -629,6 +637,22 @@ def _list_files_in_folder(folder):
629637
return files
630638

631639

640+
def _list_hdfs_files(spark_session: SparkSession, uri: str) -> List[str]:
641+
jvm = spark_session._jvm
642+
jsc = spark_session._jsc
643+
if jvm is None or jsc is None:
644+
raise RuntimeError("Spark JVM or JavaSparkContext is not available")
645+
conf = jsc.hadoopConfiguration()
646+
path = jvm.org.apache.hadoop.fs.Path(uri)
647+
fs = jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(), conf)
648+
statuses = fs.listStatus(path)
649+
files = []
650+
for f in statuses:
651+
if f.isFile():
652+
files.append(f.getPath().toString())
653+
return files
654+
655+
632656
def _cast_data_frame(
633657
df_new: pyspark.sql.DataFrame, df_existing: pyspark.sql.DataFrame
634658
) -> pyspark.sql.DataFrame:

0 commit comments

Comments
 (0)