Skip to content

Commit db002d2

Browse files
authored
feat: add recurring job id to job metadata, use it in GenericImporter (#1475)
Add the `recurringJobId` field to `JobMetadata` and `PortabilityJob` classes. When present, it means the current job is part of a recurring transfer series, all sharing the same id. When present, pass it in `X-DTP-Recurring-Job-Id` HTTP header from `Generic*Importer` classes -- similarly to `X-DTP-Job-Id` header introduced in #1473.
1 parent 7ed8262 commit db002d2

8 files changed

Lines changed: 181 additions & 11 deletions

File tree

extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericFileImporter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ private <T> boolean importSingleFileItem(
6060
dataStore.getTempFileFromInputStream(wrapper.getStream(), data.getFile().getName(), null);
6161
MediaType mimeType =
6262
Optional.ofNullable(MediaType.parse(data.getFileMimeType())).orElse(OCTET_STREAM);
63-
Request request =
63+
64+
Request.Builder builder =
6465
new Request.Builder()
6566
.url(endpoint)
6667
.addHeader("Authorization", format("Bearer %s", authData.getToken()))
@@ -71,8 +72,13 @@ private <T> boolean importSingleFileItem(
7172
.setType(MULTIPART_RELATED)
7273
.addPart(RequestBody.create(JSON, om.writeValueAsBytes(data.getJsonData())))
7374
.addPart(MultipartBody.create(mimeType, tempFile))
74-
.build())
75-
.build();
75+
.build());
76+
77+
if (this.recurringJobId.isPresent()) {
78+
builder.addHeader("X-DTP-Recurring-Job-Id", this.recurringJobId.get().toString());
79+
}
80+
81+
Request request = builder.build();
7682

7783
try (Response response = client.newCall(request).execute()) {
7884
return parseResponse(response);

extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/GenericImporter.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public String toString() {
8181
ObjectMapper om = new ObjectMapper();
8282
Map<UUID, OAuthTokenManager> jobTokenManagerMap = new HashMap<>();
8383
protected final String exportService;
84+
protected final Optional<UUID> recurringJobId;
8485

8586
static final MediaType JSON = MediaType.parse("application/json");
8687

@@ -94,6 +95,7 @@ public GenericImporter(
9495
this.endpoint = endpoint;
9596
this.containerSerializer = containerSerializer;
9697
this.exportService = JobMetadata.getExportService();
98+
this.recurringJobId = JobMetadata.getRecurringJobId();
9799
configureObjectMapper(om);
98100
}
99101

@@ -162,14 +164,18 @@ boolean parseResponse(Response response) throws IOException, InvalidTokenExcepti
162164
boolean importSingleItem(UUID jobId, TokensAndUrlAuthData authData, ImportableData<R> dataItem)
163165
throws IOException, InvalidTokenException, DestinationMemoryFullException {
164166

165-
Request request =
166-
new Request.Builder()
167-
.url(endpoint)
168-
.addHeader("Authorization", format("Bearer %s", authData.getToken()))
169-
.addHeader("X-DTP-Export-Service", this.exportService)
170-
.addHeader("X-DTP-Job-Id", jobId.toString())
171-
.post(RequestBody.create(JSON, om.writeValueAsBytes(dataItem.getJsonData())))
172-
.build();
167+
Request.Builder builder = new Request.Builder()
168+
.url(endpoint)
169+
.addHeader("Authorization", format("Bearer %s", authData.getToken()))
170+
.addHeader("X-DTP-Export-Service", this.exportService)
171+
.addHeader("X-DTP-Job-Id", jobId.toString())
172+
.post(RequestBody.create(JSON, om.writeValueAsBytes(dataItem.getJsonData())));
173+
174+
if (this.recurringJobId.isPresent()) {
175+
builder.addHeader("X-DTP-Recurring-Job-Id", this.recurringJobId.get().toString());
176+
}
177+
178+
Request request = builder.build();
173179

174180
try (Response response = client.newCall(request).execute()) {
175181
return parseResponse(response);

extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericFileImporterTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.io.IOException;
1111
import java.nio.charset.StandardCharsets;
1212
import java.util.Arrays;
13+
import java.util.Optional;
1314
import java.util.UUID;
1415
import okhttp3.Headers;
1516
import okhttp3.mockwebserver.MockResponse;
@@ -42,6 +43,7 @@ public InputStreamWrapper getStream(UUID jobId, String key) throws IOException {
4243
}
4344
};
4445
private static final UUID MOCK_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
46+
private static final UUID MOCK_RECURRING_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174001");
4547
private static final String MOCK_EXPORT_SERVICE = "mockExportService";
4648
private MockedStatic<JobMetadata> jobMetadataMock;
4749

@@ -220,4 +222,43 @@ public void testGenericFileImporterPassingJobMetadata() throws Exception {
220222
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
221223
assertTrue(executor.getErrors().isEmpty());
222224
}
225+
226+
227+
@Test
228+
public void testGenericFileImporterPassingJobMetadataRecurringJob() throws Exception {
229+
jobMetadataMock.when(JobMetadata::getRecurringJobId).thenReturn(Optional.of(MOCK_RECURRING_JOB_ID));
230+
231+
GenericFileImporter<IdOnlyContainerResource, String> importer =
232+
new GenericFileImporter<>(
233+
container ->
234+
Arrays.asList(
235+
new ImportableFileData<>(
236+
new CachedDownloadableItem(container.getId(), container.getId()),
237+
"video/mp4",
238+
new GenericPayload<>(container.getId(), "schemasource"),
239+
container.getId(),
240+
container.getId())),
241+
new AppCredentials("key", "secret"),
242+
webServer.url("/id").url(),
243+
dataStore,
244+
monitor);
245+
InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor);
246+
webServer.enqueue(new MockResponse().setResponseCode(201).setBody("OK"));
247+
248+
importer.importItem(
249+
MOCK_JOB_ID,
250+
executor,
251+
new TokensAndUrlAuthData(
252+
"accessToken", "refreshToken", webServer.url("/refresh").toString()),
253+
new IdOnlyContainerResource("id"));
254+
255+
assertEquals(1, webServer.getRequestCount());
256+
257+
RecordedRequest request = webServer.takeRequest();
258+
assertEquals("POST", request.getMethod());
259+
assertEquals(this.MOCK_EXPORT_SERVICE, request.getHeader("X-DTP-Export-Service"));
260+
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
261+
assertEquals(MOCK_RECURRING_JOB_ID.toString(), request.getHeader("X-DTP-Recurring-Job-Id"));
262+
assertTrue(executor.getErrors().isEmpty());
263+
}
223264
}

extensions/data-transfer/portability-data-transfer-generic/src/test/java/org/datatransferproject/datatransfer/generic/GenericImporterTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.Arrays;
1111
import java.util.Collection;
1212
import java.util.List;
13+
import java.util.Optional;
1314
import java.util.UUID;
1415
import okhttp3.mockwebserver.Dispatcher;
1516
import okhttp3.mockwebserver.MockResponse;
@@ -41,6 +42,7 @@ public class GenericImporterTest {
4142
@Parameter public String importerClass;
4243
private MockWebServer webServer;
4344
private static final UUID MOCK_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
45+
private static final UUID MOCK_RECURRING_JOB_ID = UUID.fromString("123e4567-e89b-12d3-a456-426614174001");
4446
private static final String MOCK_EXPORT_SERVICE = "mockExportService";
4547
private MockedStatic<JobMetadata> jobMetadataMock;
4648

@@ -391,4 +393,37 @@ public void testGenericImporterPassingJobMetadata() throws Exception {
391393
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
392394
assertTrue(executor.getErrors().isEmpty());
393395
}
396+
397+
@Test
398+
public void testGenericImporterPassingJobMetadataRecurringJob() throws Exception {
399+
jobMetadataMock.when(JobMetadata::getRecurringJobId).thenReturn(Optional.of(MOCK_RECURRING_JOB_ID));
400+
401+
InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor);
402+
GenericImporter<IdOnlyContainerResource, String> importer =
403+
getImporter(
404+
importerClass,
405+
container ->
406+
List.of(
407+
new ImportableData<>(
408+
new GenericPayload<>(container.getId(), "schemasource"),
409+
container.getId(),
410+
container.getId())));
411+
webServer.setDispatcher(getDispatcher());
412+
413+
importer.importItem(
414+
MOCK_JOB_ID,
415+
executor,
416+
new TokensAndUrlAuthData(
417+
"accessToken", "refreshToken", webServer.url("/refresh").toString()),
418+
new IdOnlyContainerResource("id"));
419+
420+
assertEquals(1, webServer.getRequestCount());
421+
RecordedRequest request = webServer.takeRequest();
422+
assertEquals("POST", request.getMethod());
423+
assertEquals(this.MOCK_EXPORT_SERVICE, request.getHeader("X-DTP-Export-Service"));
424+
assertEquals(MOCK_JOB_ID.toString(), request.getHeader("X-DTP-Job-Id"));
425+
assertEquals(MOCK_RECURRING_JOB_ID.toString(), request.getHeader("X-DTP-Recurring-Job-Id"));
426+
assertTrue(executor.getErrors().isEmpty());
427+
}
428+
394429
}

portability-spi-cloud/src/main/java/org/datatransferproject/spi/cloud/types/PortabilityJob.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.time.Instant;
1212
import java.util.Map;
1313
import java.util.TimeZone;
14+
import java.util.UUID;
1415
import javax.annotation.Nullable;
1516
import org.datatransferproject.types.common.models.DataVertical;
1617

@@ -38,6 +39,7 @@ public abstract class PortabilityJob {
3839
private static final String EXPORT_ENCRYPTED_INITIAL_AUTH_DATA =
3940
"EXPORT_ENCRYPTED_INITIAL_AUTH_DATA";
4041
private static final String JOB_STATE = "JOB_STATE";
42+
private static final String RECURRING_JOB_ID = "RECURRING_JOB_ID";
4143
private static final String TRANSFER_MODE = "TRANSFER_MODE";
4244
private static final String FAILURE_REASON = "FAILURE_REASON";
4345
private static final String NUMBER_OF_FAILED_FILES_KEY = "NUM_FAILED_FILES";
@@ -110,6 +112,11 @@ public static PortabilityJob fromMap(Map<String, Object> properties) {
110112
? DataVertical.fromDataType((String) properties.get(DATA_TYPE_KEY))
111113
: null;
112114

115+
UUID recurringJobId =
116+
properties.containsKey(RECURRING_JOB_ID)
117+
? UUID.fromString((String) properties.get(RECURRING_JOB_ID))
118+
: null;
119+
113120
return PortabilityJob.builder()
114121
.setState(state)
115122
.setExportService((String) properties.get(EXPORT_SERVICE_KEY))
@@ -135,6 +142,7 @@ public static PortabilityJob fromMap(Map<String, Object> properties) {
135142
.setUserLocale(userLocale)
136143
.setUserAlias(userAlias)
137144
.setTransferMode(transferMode)
145+
.setRecurringJobId(recurringJobId)
138146
.build();
139147
}
140148

@@ -197,6 +205,10 @@ private static void isSet(String... strings) {
197205
@JsonProperty("transferMode")
198206
public abstract TransferMode transferMode();
199207

208+
@Nullable
209+
@JsonProperty("recurringJobId")
210+
public abstract UUID recurringJobId();
211+
200212
public abstract PortabilityJob.Builder toBuilder();
201213

202214
public Map<String, Object> toMap() {
@@ -263,6 +275,10 @@ public Map<String, Object> toMap() {
263275
builder.put(TRANSFER_MODE, transferMode().toString());
264276
}
265277

278+
if (null != recurringJobId()) {
279+
builder.put(RECURRING_JOB_ID, recurringJobId());
280+
}
281+
266282
return builder.build();
267283
}
268284

@@ -361,6 +377,10 @@ public Builder setAndValidateJobAuthorization(JobAuthorization jobAuthorization)
361377
@JsonProperty("transferMode")
362378
public abstract Builder setTransferMode(TransferMode transferMode);
363379

380+
@JsonInclude(JsonInclude.Include.NON_NULL)
381+
@JsonProperty("recurringJobId")
382+
public abstract Builder setRecurringJobId(@Nullable UUID recurringJobId);
383+
364384
// For internal use only; clients should use setAndValidateJobAuthorization
365385
protected abstract Builder setJobAuthorization(JobAuthorization jobAuthorization);
366386
}

portability-transfer/src/main/java/org/datatransferproject/transfer/JobMetadata.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import com.google.common.base.Preconditions;
1919
import com.google.common.base.Stopwatch;
2020

21+
import javax.annotation.Nullable;
22+
23+
import java.util.Optional;
2124
import java.util.UUID;
2225
import org.datatransferproject.types.common.models.DataVertical;
2326

@@ -34,12 +37,14 @@
3437
public final class JobMetadata {
3538
private static byte[] encodedPrivateKey = null;
3639
private static UUID jobId = null;
40+
private static UUID recurringJobId = null;
3741
private static DataVertical dataType = null;
3842
private static String exportService = null;
3943
private static String importService = null;
4044
private static Stopwatch stopWatch = null;
4145

4246
public static boolean isInitialized() {
47+
// recurringJobId can be null and that's ok
4348
return (jobId != null
4449
&& encodedPrivateKey != null
4550
&& dataType != null
@@ -64,9 +69,28 @@ static void init(
6469
stopWatch = initStopWatch;
6570
}
6671

72+
static void init(
73+
UUID initJobId,
74+
@Nullable UUID initRecurringJobId,
75+
byte[] initEncodedPrivateKey,
76+
DataVertical initDataType,
77+
String initExportService,
78+
String initImportService,
79+
Stopwatch initStopWatch) {
80+
Preconditions.checkState(!isInitialized(), "JobMetadata cannot be initialized twice");
81+
jobId = initJobId;
82+
recurringJobId = initRecurringJobId;
83+
encodedPrivateKey = initEncodedPrivateKey;
84+
dataType = initDataType;
85+
exportService = initExportService;
86+
importService = initImportService;
87+
stopWatch = initStopWatch;
88+
}
89+
6790
// TODO: remove this
6891
static synchronized void reset() {
6992
jobId = null;
93+
recurringJobId = null;
7094
encodedPrivateKey = null;
7195
dataType = null;
7296
exportService = null;
@@ -84,6 +108,11 @@ public static UUID getJobId() {
84108
return jobId;
85109
}
86110

111+
public static Optional<UUID> getRecurringJobId() {
112+
Preconditions.checkState(isInitialized(), "JobMetadata must be initialized");
113+
return Optional.ofNullable(recurringJobId);
114+
}
115+
87116
public static DataVertical getDataType() {
88117
Preconditions.checkState(isInitialized(), "JobMetadata must be initialized");
89118
return dataType;

portability-transfer/src/main/java/org/datatransferproject/transfer/JobPollingService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ private boolean tryToClaimJob(UUID jobId, WorkerKeyPair keyPair) {
206206

207207
JobMetadata.init(
208208
jobId,
209+
existingJob.recurringJobId(),
209210
keyPair.getEncodedPrivateKey(),
210211
existingJob.transferDataType(),
211212
existingJob.exportService(),

portability-transfer/src/test/java/org/datatransferproject/transfer/JobProcessorTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.datatransferproject.transfer;
1818

19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
1921
import static org.mockito.ArgumentMatchers.any;
2022
import static org.mockito.ArgumentMatchers.anyString;
2123
import static org.mockito.ArgumentMatchers.eq;
@@ -52,6 +54,7 @@
5254
public class JobProcessorTest {
5355

5456
private UUID jobId;
57+
private UUID recurringJobId;
5558
private ExportInformation exportInfo;
5659
private AuthData exportAuthData;
5760
private AuthData importAuthData;
@@ -96,6 +99,7 @@ public TestJobProcessor(JobStore jobStore,
9699
public void setUp() throws JsonProcessingException {
97100
importAuthData = exportAuthData = Mockito.mock(AuthData.class);
98101
jobId = UUID.randomUUID();
102+
recurringJobId = UUID.randomUUID();
99103
exportInfo = Mockito.mock(ExportInformation.class);
100104
copier = Mockito.mock(InMemoryDataCopier.class);
101105
importSignalHandlerProvider = (Provider<SignalHandler>) Mockito.mock(Provider.class);
@@ -213,6 +217,9 @@ public void processJobCopiesSuccessfullyWithTransferSignalDisabled() throws Copy
213217
"",
214218
"",
215219
Stopwatch.createUnstarted());
220+
221+
assertTrue(JobMetadata.getRecurringJobId().isEmpty());
222+
216223
Mockito.doThrow(new CopyException("error", new Exception())).when(copier)
217224
.copy(importAuthData, exportAuthData, jobId, Optional.of(exportInfo));
218225
processor.processJob();
@@ -226,4 +233,29 @@ public void processJobCopiesSuccessfullyWithTransferSignalDisabled() throws Copy
226233
Mockito.verify(exportSignalHandler, Mockito.never())
227234
.sendSignal(any(SignalRequest.class), eq(exportAuthData), any(Monitor.class));
228235
}
236+
237+
@Test
238+
public void testInitJobMetadataWithRecurringId() throws CopyException,
239+
IOException, RetryException {
240+
processor = Mockito.spy(
241+
new TestJobProcessor(jobStore,
242+
copier,
243+
objectMapper,
244+
decryptionService,
245+
Boolean.FALSE,
246+
importSignalHandlerProvider,
247+
exportSignalHandlerProvider));
248+
249+
JobMetadata.init(
250+
jobId,
251+
recurringJobId,
252+
"".getBytes(),
253+
DataVertical.BLOBS,
254+
"",
255+
"",
256+
Stopwatch.createUnstarted());
257+
258+
assertEquals(jobId, JobMetadata.getJobId());
259+
assertEquals(recurringJobId, JobMetadata.getRecurringJobId().get());
260+
}
229261
}

0 commit comments

Comments
 (0)