Skip to content

Commit 5862bfa

Browse files
author
Andrew Pikler
committed
Switch to OkHttp. attempt to workaround oref api brokenness
If an API call fails for any reason, we retry a different server
1 parent 5b80331 commit 5862bfa

8 files changed

Lines changed: 175 additions & 52 deletions

File tree

oref-integration/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@
7272
<artifactId>guava</artifactId>
7373
<version>33.4.8-jre</version>
7474
</dependency>
75+
<dependency>
76+
<groupId>org.apache.commons</groupId>
77+
<artifactId>commons-compress</artifactId>
78+
<version>1.27.1</version>
79+
</dependency>
80+
<dependency>
81+
<groupId>com.squareup.okhttp3</groupId>
82+
<artifactId>okhttp</artifactId>
83+
<version>4.12.0</version>
84+
</dependency>
7585
<dependency>
7686
<groupId>org.slf4j</groupId>
7787
<artifactId>slf4j-api</artifactId>

oref-integration/src/main/java/com/github/pyckle/oref/cli/Main.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.github.pyckle.oref.cli;
22

3+
import com.github.pyckle.oref.integration.DnsUtil;
34
import com.github.pyckle.oref.integration.activealerts.ActiveAlert;
45
import com.github.pyckle.oref.integration.caching.CachedApiResult;
56
import com.github.pyckle.oref.integration.caching.OrefApiCachingService;
@@ -15,6 +16,7 @@
1516
*/
1617
public class Main {
1718
public static void main(String[] args) throws InterruptedException {
19+
DnsUtil.disableDnsCaching();
1820
OrefApiCachingService orefApiCachingService = new OrefApiCachingService(new OrefConfig(new Properties()));
1921
orefApiCachingService.waitForInitialization();
2022
System.out.println("Caches initialized");
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.github.pyckle.oref.integration;
2+
3+
public class DnsUtil
4+
{
5+
/**
6+
* Disable JVM dns caching to ensure we get fresh hosts whenever we resolve oref servers.
7+
* <br>
8+
* This MUST be called on startup before any networking is done. See also: <a
9+
* href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/net/doc-files/net-properties.html">...</a>
10+
*/
11+
public static void disableDnsCaching() {
12+
java.security.Security.setProperty("networkaddress.cache.ttl", "0");
13+
java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");
14+
}
15+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.github.pyckle.oref.integration.caching;
22

3-
import java.net.http.HttpResponse;
3+
import okhttp3.Response;
44

55
public record ApiResponse<T>(
6-
HttpResponse<?> response,
6+
Response response,
77
T responseObj) {
88
}

oref-integration/src/main/java/com/github/pyckle/oref/integration/caching/CachedApiResult.java

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
package com.github.pyckle.oref.integration.caching;
22

3-
import com.github.pyckle.oref.integration.translationstores.DistrictStore;
4-
import org.slf4j.Logger;
5-
import org.slf4j.LoggerFactory;
3+
import okhttp3.Headers;
4+
import okhttp3.Response;
65

7-
import java.net.http.HttpHeaders;
8-
import java.net.http.HttpResponse;
96
import java.time.Instant;
10-
import java.time.ZonedDateTime;
11-
import java.time.format.DateTimeFormatter;
12-
import java.time.format.DateTimeParseException;
7+
import java.util.Date;
138
import java.util.Objects;
149
import java.util.Optional;
1510
import java.util.regex.Pattern;
@@ -21,27 +16,26 @@ public record CachedApiResult<T>(Instant localTimestamp,
2116
long maxAge,
2217
T retrievedValue) {
2318

24-
private static final Logger logger = LoggerFactory.getLogger(DistrictStore.class);
2519
private static final String LAST_MODIFIED_HEADER = "Last-Modified";
2620
private static final String SERVER_TIMESTAMP = "Date";
2721
private static final String CACHE_CONTROL = "Cache-Control";
2822
private static final Pattern MAX_AGE_PATTERN = Pattern.compile("max-age=(\\d{1,18})");
2923

3024
public static <T> CachedApiResult<T> buildCachedApiResult(Instant localTime,
31-
HttpResponse<?> response,
25+
Response response,
3226
T retrievedValue) {
3327

34-
int statusCode = response.statusCode();
28+
int statusCode = response.code();
3529
Instant serverTimestamp = parseDate(response.headers(), SERVER_TIMESTAMP);
3630
Instant lastModified = parseDate(response.headers(), LAST_MODIFIED_HEADER);
3731
long maxAge = maxAge(response.headers());
3832

3933
return new CachedApiResult<>(localTime, statusCode, serverTimestamp, lastModified, maxAge, retrievedValue);
4034
}
4135

42-
static long maxAge(HttpHeaders headers) {
43-
var header = headers.firstValue(CACHE_CONTROL);
44-
return maxAge(header);
36+
static long maxAge(Headers headers) {
37+
var header = headers.get(CACHE_CONTROL);
38+
return maxAge(Optional.ofNullable(header));
4539
}
4640

4741
static long maxAge(Optional<String> header) {
@@ -55,18 +49,9 @@ static long maxAge(Optional<String> header) {
5549
}
5650

5751

58-
private static Instant parseDate(HttpHeaders headers, String header) {
59-
var headerDateStr = headers.firstValue(header);
60-
if (headerDateStr.isPresent()) {
61-
try {
62-
ZonedDateTime zdt = ZonedDateTime.parse(headerDateStr.get(), DateTimeFormatter.RFC_1123_DATE_TIME);
63-
return zdt.toInstant();
64-
} catch (DateTimeParseException ex) {
65-
logger.debug("Could not parse date: {}", headerDateStr.get());
66-
logger.trace("Could not parse date", ex);
67-
}
68-
}
69-
return null;
52+
private static Instant parseDate(Headers headers, String header) {
53+
Date date = headers.getDate(header);
54+
return date == null ? null : date.toInstant();
7055
}
7156

7257
/**

oref-integration/src/main/java/com/github/pyckle/oref/integration/caching/OrefApiClient.java

Lines changed: 123 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,60 +4,163 @@
44
import com.google.common.io.CountingInputStream;
55
import com.google.gson.Gson;
66
import com.google.gson.reflect.TypeToken;
7+
import okhttp3.CacheControl;
8+
import okhttp3.ConnectionPool;
9+
import okhttp3.Dns;
10+
import okhttp3.Headers;
11+
import okhttp3.HttpUrl;
12+
import okhttp3.OkHttpClient;
13+
import okhttp3.Request;
14+
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
15+
import org.jetbrains.annotations.NotNull;
716
import org.slf4j.Logger;
817
import org.slf4j.LoggerFactory;
918

19+
import java.io.BufferedReader;
1020
import java.io.IOException;
1121
import java.io.InputStream;
1222
import java.io.InputStreamReader;
13-
import java.net.http.HttpClient;
23+
import java.net.InetAddress;
24+
import java.net.UnknownHostException;
1425
import java.net.http.HttpRequest;
15-
import java.net.http.HttpResponse;
1626
import java.nio.charset.StandardCharsets;
17-
import java.util.zip.GZIPInputStream;
27+
import java.time.Duration;
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.Collections;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
1834

1935
public class OrefApiClient {
2036
private static final Logger logger = LoggerFactory.getLogger(OrefApiClient.class);
21-
private static final HttpClient client = HttpClient.newHttpClient();
37+
private static OkHttpClient client = null;
38+
private static ConnectionPool connectionPool;
2239
private static final int SUCCESS_STATUS_CODE = 200;
2340

2441
public static <T> ApiResponse<T> get(HttpRequest req, TypeToken<T> typeToken)
2542
throws IOException, InterruptedException {
2643
Gson gson = new Gson();
2744

2845
Stopwatch stopwatch = Stopwatch.createStarted();
29-
HttpResponse<InputStream> httpResponse = client.send(req, HttpResponse.BodyHandlers.ofInputStream());
30-
31-
// Body stream *MUST* be listed separately because we always want to close it even with there is an error
32-
// creating the gzipReader. Not closing the body stream is a memory leak..
33-
try (CountingInputStream is = new CountingInputStream(httpResponse.body());
34-
InputStream gzipReader = wrapWithGzip(httpResponse, is);
35-
InputStreamReader isr = new InputStreamReader(gzipReader, StandardCharsets.UTF_8)) {
36-
if (httpResponse.statusCode() != SUCCESS_STATUS_CODE) {
37-
logger.warn("Failed http request: {} {} {}", req.uri(), httpResponse.statusCode(), httpResponse.headers());
38-
throw new RuntimeException("Unexpected Status Code " + httpResponse.statusCode());
46+
47+
List<String> allheaders = convertHeadersToOkHttp(req);
48+
var okHttpReq = new Request.Builder()
49+
.cacheControl(new CacheControl.Builder().noCache().build())
50+
.headers(Headers.of(allheaders.toArray(new String[0])))
51+
.url(req.uri().toURL())
52+
.build();
53+
var httpResponse = getHttpClient().newCall(okHttpReq).execute();
54+
logger.debug("Received resp - content-length {} bytes in Request to {}: Response: {} in {}",
55+
httpResponse.body() == null ? -1 : httpResponse.body().contentLength(), req.uri(),
56+
httpResponse.code(), stopwatch.elapsed());
57+
58+
try (var is = httpResponse.body().byteStream();
59+
var wrappedReader = new CountingInputStream(wrapWithGzip(httpResponse.request().url(), httpResponse.headers(), is));
60+
var isr = new BufferedReader(new InputStreamReader(wrappedReader, StandardCharsets.UTF_8))) {
61+
if (httpResponse.code() != SUCCESS_STATUS_CODE) {
62+
logger.warn("Failed http request: {} {} {}", req.uri(), httpResponse.code(),
63+
httpResponse.headers());
64+
throw new RuntimeException("Unexpected Status Code " + httpResponse.code());
3965
} else {
4066
T ret = gson.fromJson(isr, typeToken);
4167

42-
logger.debug("Fetched {} bytes in Request to {}: Response: {} in {}", is.getCount(), req.uri(),
43-
httpResponse.statusCode(), stopwatch.elapsed());
44-
logger.trace("Request to {}: Response: {} {} {}", req, httpResponse.statusCode(),
68+
logger.debug("Decoded {} bytes uncompressed: {} bytes in Request to {}: Response: {} in {}",
69+
httpResponse.body().contentLength(), wrappedReader.getCount(), req.uri(), httpResponse.code(),
70+
stopwatch.elapsed());
71+
logger.trace("Request to {}: Response: {} {} {}", req, httpResponse.code(),
4572
httpResponse.headers(), ret);
4673

4774
return new ApiResponse<>(httpResponse, ret);
4875
}
76+
} catch (Exception ex) {
77+
logger.info("Failed to decode resp {}", httpResponse.request().url());
78+
// evicting from the pool forces a reconnect, hopefully to a different host that is not broken.
79+
connectionPool.evictAll();
80+
throw ex;
81+
}
82+
}
83+
84+
@NotNull
85+
private static List<String> convertHeadersToOkHttp(HttpRequest req)
86+
{
87+
List<String> allheaders = new ArrayList<>();
88+
for (var h : req.headers().map().entrySet())
89+
{
90+
for (var val : h.getValue())
91+
{
92+
allheaders.add(h.getKey());
93+
allheaders.add(val);
94+
}
95+
}
96+
return allheaders;
97+
}
98+
99+
private static synchronized OkHttpClient getHttpClient() {
100+
if (client == null)
101+
{
102+
connectionPool = new ConnectionPool();
103+
client = new OkHttpClient.Builder()
104+
.callTimeout(Duration.ofSeconds(8))
105+
.connectionPool(connectionPool)
106+
.dns(new RotatingDns())
107+
.build();
49108
}
109+
return client;
50110
}
51111

52-
private static InputStream wrapWithGzip(HttpResponse<InputStream> httpResponse, InputStream is) throws IOException {
53-
var contentEncoding = httpResponse.headers().allValues("Content-Encoding");
112+
private static InputStream wrapWithGzip(HttpUrl url, Headers responseHeaders, InputStream is) throws IOException {
113+
var contentEncoding = responseHeaders.toMultimap().get("Content-Encoding");
54114
if (!contentEncoding.isEmpty()) {
55115
if (contentEncoding.size() == 1 && contentEncoding.get(0).equalsIgnoreCase("gzip")) {
56-
is = new GZIPInputStream(is);
116+
logger.debug("response is gzipped encoded {}", url);
117+
return new GzipCompressorInputStream(is);
57118
} else {
58119
throw new IllegalStateException("Unknown content encoding " + contentEncoding);
59120
}
60121
}
122+
logger.debug("response is not gzipped encoded {}", url);
61123
return is;
62124
}
125+
126+
/**
127+
* A dns resolver that ensures that the returned hosts are shuffled and the same host is not returned at the start of the list twice
128+
*/
129+
private static class RotatingDns implements Dns
130+
{
131+
private final Map<String, InetAddress> cachedIps = new HashMap<>();
132+
133+
@NotNull
134+
@Override
135+
public synchronized List<InetAddress> lookup(@NotNull String hostname) throws UnknownHostException
136+
{
137+
try
138+
{
139+
var ips = InetAddress.getAllByName(hostname);
140+
if (ips.length > 1 && cachedIps.containsKey(hostname))
141+
{
142+
var oldFirstAddr = cachedIps.get(hostname);
143+
Collections.shuffle(Arrays.asList(ips));
144+
145+
// make sure that the last first IP we returned is not the first IP again, since that is likely what was used.
146+
if (oldFirstAddr.equals(ips[0]))
147+
{
148+
ips[0] = ips[ips.length - 1];
149+
ips[ips.length - 1] = oldFirstAddr;
150+
}
151+
}
152+
if (ips.length > 0)
153+
{
154+
cachedIps.put(hostname, ips[0]);
155+
}
156+
var ret = List.of(ips);
157+
logger.info("resolved {} to {}", hostname, ret);
158+
return ret;
159+
}
160+
catch (NullPointerException e)
161+
{
162+
throw new UnknownHostException("Broken system behaviour for dns lookup of " + hostname);
163+
}
164+
}
165+
}
63166
}

oref-integration/src/main/java/com/github/pyckle/oref/integration/caching/OrefHttpRequestFactory.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.github.pyckle.oref.integration.caching;
22

33
import java.net.URI;
4+
import java.net.http.HttpClient;
45
import java.net.http.HttpRequest;
56
import java.time.Duration;
67
import java.util.Arrays;
@@ -30,18 +31,23 @@ static HttpRequest buildRequest(URI uri, String... headers) {
3031
}
3132

3233
private static String[] addAcceptEncodingGzip(String[] headers) {
33-
String[] allHeaders = null;
34+
String[] allHeaders = addHeader(headers, acceptEncodingHeader, acceptEncodingGzip);
35+
return allHeaders;
36+
}
3437

38+
private static String[] addHeader(String[] headers, String headerToAdd, String headerVal)
39+
{
40+
String[] allHeaders = null;
3541
for (int i = 0; i < headers.length; i += 2) {
36-
if (headers[i].equalsIgnoreCase(acceptEncodingHeader)) {
42+
if (headers[i].equalsIgnoreCase(headerToAdd)) {
3743
allHeaders = headers;
3844
break;
3945
}
4046
}
4147
if (allHeaders == null) {
4248
allHeaders = Arrays.copyOf(headers, headers.length + 2);
43-
allHeaders[allHeaders.length - 2] = acceptEncodingHeader;
44-
allHeaders[allHeaders.length - 1] = acceptEncodingGzip;
49+
allHeaders[allHeaders.length - 2] = headerToAdd;
50+
allHeaders[allHeaders.length - 1] = headerVal;
4551
}
4652
return allHeaders;
4753
}

oref-swingui/src/main/java/com/github/pyckle/oref/ui/Main.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.github.pyckle.oref.ui;
22

3+
import com.github.pyckle.oref.integration.DnsUtil;
34
import com.github.pyckle.oref.integration.config.OrefConfig;
45

56
import java.util.Arrays;
@@ -9,6 +10,7 @@
910
public class Main {
1011

1112
public static void main(String[] args) {
13+
DnsUtil.disableDnsCaching();
1214
configSlf4j(args);
1315
Properties properties = parseProperties(args);
1416
javax.swing.SwingUtilities.invokeLater(() -> new PekudeiOrefGui(properties).createAndShowGUI());

0 commit comments

Comments
 (0)