Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,25 @@ Content-Type: application/json
"error_description": "The access token expired"
}
```

The worker will request a token refresh through the standard OAuth refresh token flow.

### Invalidating Refresh Token

If you need to stop receiving data for a specific account (for example, after an account deletion), invalidating the refresh token (and any associated access tokens) is sufficient.
Sending following response with refresh token will ensure that the token can no longer be used and that no further actions should be taken with it:
```http
HTTP/1.1 401 Unauthorized
Content-Type: application/json

{
"error": "session_invalidated",
"error_description": "The refresh token expired"
}
```

The workers will stop using the refresh token for current and subsequent recurring jobs


### Destination Full

If the destination is unable to accept additional data for user due to capacity constrain or other limitations,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.datatransferproject.datatransfer.generic;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Optional;
import javax.annotation.Nullable;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ErrorResponse {
private final String error;
private final Optional<String> errorDescription;

@JsonCreator
public ErrorResponse(
@JsonProperty(value = "error", required = true) String error,
@Nullable @JsonProperty("error_description") String errorDescription) {
this.error = error;
this.errorDescription = Optional.ofNullable(errorDescription);
}

public String getError() {
return error;
}

public Optional<String> getErrorDescription() {
return errorDescription;
}

public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(error);
if (errorDescription.isPresent()) {
builder.append(" - ");
builder.append(errorDescription.get());
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
import org.datatransferproject.spi.transfer.types.InvalidTokenException;
import org.datatransferproject.spi.transfer.types.SessionInvalidatedException;
import org.datatransferproject.types.common.models.ContainerResource;
import org.datatransferproject.types.transfer.auth.AppCredentials;
import org.datatransferproject.types.transfer.auth.AuthData;
Expand All @@ -44,7 +45,7 @@ public GenericFileImporter(
@Override
public boolean importSingleItem(
UUID jobId, TokensAndUrlAuthData authData, ImportableData<R> dataItem)
throws IOException, InvalidTokenException, DestinationMemoryFullException {
throws IOException, InvalidTokenException, DestinationMemoryFullException, SessionInvalidatedException {
if (dataItem instanceof ImportableFileData) {
return importSingleFileItem(jobId, authData, (ImportableFileData<R>) dataItem);
} else {
Expand All @@ -54,7 +55,7 @@ public boolean importSingleItem(

private <T> boolean importSingleFileItem(
UUID jobId, AuthData authData, ImportableFileData<R> data)
throws IOException, InvalidTokenException, DestinationMemoryFullException {
throws IOException, InvalidTokenException, DestinationMemoryFullException, SessionInvalidatedException {
InputStreamWrapper wrapper = connectionProvider.getInputStreamForItem(jobId, data.getFile());
File tempFile =
dataStore.getTempFileFromInputStream(wrapper.getStream(), data.getFile().getName(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.datatransferproject.spi.transfer.provider.Importer;
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
import org.datatransferproject.spi.transfer.types.InvalidTokenException;
import org.datatransferproject.spi.transfer.types.SessionInvalidatedException;
import org.datatransferproject.transfer.JobMetadata;
import org.datatransferproject.types.common.models.ContainerResource;
import org.datatransferproject.types.transfer.auth.AppCredentials;
Expand All @@ -41,38 +42,6 @@
public class GenericImporter<C extends ContainerResource, R>
implements Importer<TokensAndUrlAuthData, C> {

@JsonIgnoreProperties(ignoreUnknown = true)
static class ErrorResponse {
private final String error;
private final Optional<String> errorDescription;

@JsonCreator
public ErrorResponse(
@JsonProperty(value = "error", required = true) String error,
@Nullable @JsonProperty("error_description") String errorDescription) {
this.error = error;
this.errorDescription = Optional.ofNullable(errorDescription);
}

public String getError() {
return error;
}

public Optional<String> getErrorDescription() {
return errorDescription;
}

public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(error);
if (errorDescription.isPresent()) {
builder.append(" - ");
builder.append(errorDescription.get());
}
return builder.toString();
}
}

ContainerSerializer<C, R> containerSerializer;
URL endpoint;
Monitor monitor;
Expand Down Expand Up @@ -131,7 +100,7 @@ public ImportResult importItem(
return new ImportResult(ResultType.OK);
}

boolean parseResponse(Response response) throws IOException, InvalidTokenException, DestinationMemoryFullException {
boolean parseResponse(Response response) throws IOException, InvalidTokenException, DestinationMemoryFullException, SessionInvalidatedException {
if (response.code() >= 400) {
byte[] body = response.body().bytes();
ErrorResponse error;
Expand All @@ -147,7 +116,9 @@ boolean parseResponse(Response response) throws IOException, InvalidTokenExcepti

if (response.code() == 401 && error.getError().equals("invalid_token")) {
throw new InvalidTokenException(error.toString(), null);
} if (response.code() == 413 && error.getError().equals("destination_full")) {
}

if (response.code() == 413 && error.getError().equals("destination_full")) {
throw new DestinationMemoryFullException(
String.format("Generic importer failed with code (%s)", response.code()),
new RuntimeException("destination_full"));
Expand All @@ -162,7 +133,7 @@ boolean parseResponse(Response response) throws IOException, InvalidTokenExcepti
}

boolean importSingleItem(UUID jobId, TokensAndUrlAuthData authData, ImportableData<R> dataItem)
throws IOException, InvalidTokenException, DestinationMemoryFullException {
throws IOException, InvalidTokenException, DestinationMemoryFullException, SessionInvalidatedException {

Request.Builder builder = new Request.Builder()
.url(endpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import okhttp3.Request;
import okhttp3.Response;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.datatransfer.generic.ErrorResponse;
import org.datatransferproject.spi.transfer.types.InvalidTokenException;
import org.datatransferproject.spi.transfer.types.SessionInvalidatedException;
import org.datatransferproject.types.transfer.auth.AppCredentials;
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;

Expand Down Expand Up @@ -101,7 +103,7 @@ public OAuthTokenManager(
this.om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

private TokensAndUrlAuthData refreshToken() throws IOException {
private TokensAndUrlAuthData refreshToken() throws IOException, SessionInvalidatedException {
monitor.info(() -> "Refreshing OAuth token");
Request request =
new Request.Builder()
Expand All @@ -117,13 +119,29 @@ private TokensAndUrlAuthData refreshToken() throws IOException {
.build();

try (Response response = client.newCall(request).execute()) {
byte[] body = response.body().bytes();
if (response.code() >= 400) {
if(response.code() == 401) {
ErrorResponse error;
try {
error = om.readValue(body, ErrorResponse.class);
} catch (JsonParseException | JsonMappingException e) {
throw new IOException(
format(
"Unexpected response while refreshing token: %s",
new String(body, StandardCharsets.UTF_8)),
e);
}
if(error.getError().equals("session_invalidated")) {
throw new SessionInvalidatedException(error.getErrorDescription().orElse("session_invalidated"), null);
}
}

throw new IOException(
format(
"Error while refreshing token (%d): %s",
response.code(), new String(response.body().bytes(), StandardCharsets.UTF_8)));
response.code(), new String(body, StandardCharsets.UTF_8)));
}
byte[] body = response.body().bytes();
RefreshTokenResponse responsePayload;
try {
responsePayload = om.readValue(body, RefreshTokenResponse.class);
Expand Down Expand Up @@ -154,7 +172,7 @@ private TokensAndUrlAuthData refreshToken() throws IOException {
* access token has been refreshed
*/
public <T, Ex extends Exception> T withAuthData(FunctionRequiringAuthData<T, Ex> f)
throws Ex, InvalidTokenException, IOException {
throws Ex, InvalidTokenException, IOException, SessionInvalidatedException {
try {
return f.execute(authData);
} catch (InvalidTokenException e) {
Expand Down
Loading