diff --git a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java index 79a3b51da40..a7281a552bd 100644 --- a/core/src/main/java/feast/core/model/FeatureSetJobStatus.java +++ b/core/src/main/java/feast/core/model/FeatureSetJobStatus.java @@ -21,6 +21,7 @@ import java.io.Serializable; import javax.persistence.*; import javax.persistence.Entity; +import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -37,6 +38,7 @@ public class FeatureSetJobStatus { @Embeddable @EqualsAndHashCode + @AllArgsConstructor public static class FeatureSetJobStatusKey implements Serializable { public FeatureSetJobStatusKey() {} diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index c7862a386e3..59cc619fa65 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -241,8 +241,10 @@ private boolean jobRequiresUpgrade(Job job, Set stores) { * @param featureSet featureSet {@link FeatureSet} to find jobs and allocate */ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { - Set toAdd = new HashSet<>(); - Set existing = featureSet.getJobStatuses(); + Map current = new HashMap<>(); + Map existing = + featureSet.getJobStatuses().stream() + .collect(Collectors.toMap(FeatureSetJobStatus::getId, s -> s)); Stream> jobArgsStream = getAllStores().stream() @@ -265,14 +267,17 @@ FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) { status.setJob(job); status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - toAdd.add(status); + current.put( + new FeatureSetJobStatus.FeatureSetJobStatusKey(job.getId(), featureSet.getId()), status); } - Set toDelete = Sets.difference(existing, toAdd); - toAdd = Sets.difference(toAdd, existing); + Set toDelete = + Sets.difference(existing.keySet(), current.keySet()); + Set toAdd = + Sets.difference(current.keySet(), existing.keySet()); - jobStatusRepository.deleteAll(toDelete); - jobStatusRepository.saveAll(toAdd); + jobStatusRepository.deleteAll(toDelete.stream().map(existing::get).collect(Collectors.toSet())); + jobStatusRepository.saveAll(toAdd.stream().map(current::get).collect(Collectors.toSet())); jobStatusRepository.flush(); return featureSet; } @@ -378,7 +383,8 @@ public void notifyJobsWhenFeatureSetUpdated() { // FeatureSet). // We now set status to IN_PROGRESS, so listenAckFromJobs would be able to // monitor delivery progress for each new version. - fs.getJobStatuses().stream() + Set jobStatuses = fs.getJobStatuses(); + jobStatuses.stream() .filter(s -> s.getJob().isRunning()) .forEach( jobStatus -> { @@ -386,7 +392,8 @@ public void notifyJobsWhenFeatureSetUpdated() { FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); jobStatus.setVersion(fs.getVersion()); }); - featureSetRepository.saveAndFlush(fs); + jobStatusRepository.saveAll(jobStatuses); + jobStatusRepository.flush(); }); } diff --git a/core/src/test/java/feast/core/util/TestUtil.java b/core/src/test/java/feast/core/util/TestUtil.java index 5b3bf6b10c6..b93aaa8cd42 100644 --- a/core/src/test/java/feast/core/util/TestUtil.java +++ b/core/src/test/java/feast/core/util/TestUtil.java @@ -141,6 +141,7 @@ public static FeatureSetJobStatus CreateFeatureSetJobStatusWithJob( featureSetJobStatus.setDeliveryStatus(deliveryStatus); featureSetJobStatus.setVersion(version); + featureSetJobStatus.setId(new FeatureSetJobStatus.FeatureSetJobStatusKey(job.getId(), 0)); return featureSetJobStatus; }