diff --git a/README.md b/README.md index 45c0fa1d9..14289c27b 100644 --- a/README.md +++ b/README.md @@ -394,7 +394,7 @@ Different transport protocols can be configured with specific settings using spe ##### JSON-RPC Transport Configuration -For the JSON-RPC transport, to use the default `JdkA2AHttpClient`, provide a `JSONRPCTransportConfig` created with its default constructor. +For the JSON-RPC transport, to use the default HTTP client (resolved automatically by `A2AHttpClientFactory`), provide a `JSONRPCTransportConfig` created with its default constructor. To use a custom HTTP client implementation, simply create a `JSONRPCTransportConfig` as follows: @@ -441,7 +441,7 @@ Client client = Client ##### HTTP+JSON/REST Transport Configuration -For the HTTP+JSON/REST transport, if you'd like to use the default `JdkA2AHttpClient`, provide a `RestTransportConfig` created with its default constructor. +For the HTTP+JSON/REST transport, to use the default HTTP client (resolved automatically by `A2AHttpClientFactory`), provide a `RestTransportConfig` created with its default constructor. To use a custom HTTP client implementation, simply create a `RestTransportConfig` as follows: diff --git a/client/base/src/main/java/io/a2a/A2A.java b/client/base/src/main/java/io/a2a/A2A.java index d64cdaa27..7762eb2d2 100644 --- a/client/base/src/main/java/io/a2a/A2A.java +++ b/client/base/src/main/java/io/a2a/A2A.java @@ -3,11 +3,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.UUID; import io.a2a.client.http.A2ACardResolver; import io.a2a.client.http.A2AHttpClient; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.spec.A2AClientError; import io.a2a.spec.A2AClientJSONError; import io.a2a.spec.AgentCard; @@ -139,7 +138,7 @@ private static Message toMessage(List> parts, Message.Role role, String * @throws A2AClientJSONError If the response body cannot be decoded as JSON or validated against the AgentCard schema */ public static AgentCard getAgentCard(String agentUrl) throws A2AClientError, A2AClientJSONError { - return getAgentCard(new JdkA2AHttpClient(), agentUrl); + return getAgentCard(A2AHttpClientFactory.create(), agentUrl); } /** @@ -167,7 +166,7 @@ public static AgentCard getAgentCard(A2AHttpClient httpClient, String agentUrl) * @throws A2AClientJSONError If the response body cannot be decoded as JSON or validated against the AgentCard schema */ public static AgentCard getAgentCard(String agentUrl, String relativeCardPath, Map authHeaders) throws A2AClientError, A2AClientJSONError { - return getAgentCard(new JdkA2AHttpClient(), agentUrl, relativeCardPath, authHeaders); + return getAgentCard(A2AHttpClientFactory.create(), agentUrl, relativeCardPath, authHeaders); } /** diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java index 92e6d86b9..188f83b50 100644 --- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java +++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java @@ -14,8 +14,8 @@ import io.a2a.client.transport.spi.interceptors.ClientCallInterceptor; import io.a2a.client.transport.spi.interceptors.PayloadAndHeaders; import io.a2a.client.http.A2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.http.A2AHttpResponse; -import io.a2a.client.http.JdkA2AHttpClient; import io.a2a.client.transport.spi.ClientTransport; import io.a2a.spec.A2AClientError; import io.a2a.spec.A2AClientException; @@ -84,7 +84,7 @@ public JSONRPCTransport(AgentCard agentCard) { public JSONRPCTransport(A2AHttpClient httpClient, AgentCard agentCard, String agentUrl, List interceptors) { - this.httpClient = httpClient == null ? new JdkA2AHttpClient() : httpClient; + this.httpClient = httpClient == null ? A2AHttpClientFactory.create() : httpClient; this.agentCard = agentCard; this.agentUrl = agentUrl; this.interceptors = interceptors; diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfigBuilder.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfigBuilder.java index 64153620f..ef27ac43c 100644 --- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfigBuilder.java +++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportConfigBuilder.java @@ -1,7 +1,7 @@ package io.a2a.client.transport.jsonrpc; import io.a2a.client.http.A2AHttpClient; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.transport.spi.ClientTransportConfigBuilder; public class JSONRPCTransportConfigBuilder extends ClientTransportConfigBuilder { @@ -18,7 +18,7 @@ public JSONRPCTransportConfigBuilder httpClient(A2AHttpClient httpClient) { public JSONRPCTransportConfig build() { // No HTTP client provided, fallback to the default one (JDK-based implementation) if (httpClient == null) { - httpClient = new JdkA2AHttpClient(); + httpClient = A2AHttpClientFactory.create(); } JSONRPCTransportConfig config = new JSONRPCTransportConfig(httpClient); diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java index 97c22866a..75403515c 100644 --- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java +++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransportProvider.java @@ -1,6 +1,6 @@ package io.a2a.client.transport.jsonrpc; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.transport.spi.ClientTransportProvider; import io.a2a.spec.A2AClientException; import io.a2a.spec.AgentCard; @@ -11,7 +11,7 @@ public class JSONRPCTransportProvider implements ClientTransportProvider interceptors) { - this.httpClient = httpClient == null ? new JdkA2AHttpClient() : httpClient; + this.httpClient = httpClient == null ? A2AHttpClientFactory.create() : httpClient; this.agentCard = agentCard; this.agentUrl = agentUrl.endsWith("/") ? agentUrl.substring(0, agentUrl.length() - 1) : agentUrl; this.interceptors = interceptors; diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java index 68150f189..01d97f27c 100644 --- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java +++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportConfigBuilder.java @@ -1,7 +1,7 @@ package io.a2a.client.transport.rest; import io.a2a.client.http.A2AHttpClient; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.transport.spi.ClientTransportConfigBuilder; import org.jspecify.annotations.Nullable; @@ -16,9 +16,8 @@ public RestTransportConfigBuilder httpClient(A2AHttpClient httpClient) { @Override public RestTransportConfig build() { - // No HTTP client provided, fallback to the default one (JDK-based implementation) if (httpClient == null) { - httpClient = new JdkA2AHttpClient(); + httpClient = A2AHttpClientFactory.create(); } RestTransportConfig config = new RestTransportConfig(httpClient); diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java index 99d155968..f9a178c5c 100644 --- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java +++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransportProvider.java @@ -1,6 +1,6 @@ package io.a2a.client.transport.rest; -import io.a2a.client.http.JdkA2AHttpClient; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.client.transport.spi.ClientTransportProvider; import io.a2a.spec.A2AClientException; import io.a2a.spec.AgentCard; @@ -17,9 +17,9 @@ public String getTransportProtocol() { public RestTransport create(RestTransportConfig clientTransportConfig, AgentCard agentCard, String agentUrl) throws A2AClientException { RestTransportConfig transportConfig = clientTransportConfig; if (transportConfig == null) { - transportConfig = new RestTransportConfig(new JdkA2AHttpClient()); + transportConfig = new RestTransportConfig(A2AHttpClientFactory.create()); } - return new RestTransport(clientTransportConfig.getHttpClient(), agentCard, agentUrl, transportConfig.getInterceptors()); + return new RestTransport(transportConfig.getHttpClient(), agentCard, agentUrl, transportConfig.getInterceptors()); } @Override diff --git a/extras/http-client-android/pom.xml b/extras/http-client-android/pom.xml new file mode 100644 index 000000000..0b87a5466 --- /dev/null +++ b/extras/http-client-android/pom.xml @@ -0,0 +1,48 @@ + + + 4.0.0 + + + io.github.a2asdk + a2a-java-sdk-parent + 0.3.4.Beta1-SNAPSHOT + ../../pom.xml + + a2a-java-sdk-http-client-android + jar + + Java SDK A2A HTTP Client: Android + Java SDK for the Agent2Agent Protocol (A2A) - Android HTTP Client + + + + ${project.groupId} + a2a-java-sdk-http-client + + + ${project.groupId} + a2a-java-sdk-spec + + + + ${project.groupId} + a2a-java-sdk-http-client + test-jar + test + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.mock-server + mockserver-netty + test + + + + diff --git a/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClient.java b/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClient.java new file mode 100644 index 000000000..c023f083d --- /dev/null +++ b/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClient.java @@ -0,0 +1,290 @@ +package io.a2a.client.http; + +import static java.net.HttpURLConnection.HTTP_FORBIDDEN; +import static java.net.HttpURLConnection.HTTP_MULT_CHOICE; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED; + +import io.a2a.common.A2AErrorMessages; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + +/** Android-specific implementation of {@link A2AHttpClient} using {@link HttpURLConnection}. */ +public class AndroidA2AHttpClient implements A2AHttpClient { + + private static final Executor NET_EXECUTOR = Executors.newCachedThreadPool(r -> { + Thread t = new Thread(r, "A2A-Android-Net"); + t.setDaemon(true); + return t; + }); + + @Override + public GetBuilder createGet() { + return new AndroidGetBuilder(); + } + + @Override + public PostBuilder createPost() { + return new AndroidPostBuilder(); + } + + @Override + public DeleteBuilder createDelete() { + return new AndroidDeleteBuilder(); + } + + private abstract static class AndroidBuilder> implements Builder { + protected String url = ""; + protected Map headers = new HashMap<>(); + + @Override + public T url(String url) { + this.url = url; + return self(); + } + + @Override + public T addHeader(String name, String value) { + headers.put(name, value); + return self(); + } + + @Override + public T addHeaders(Map headers) { + if (headers != null) { + this.headers.putAll(headers); + } + return self(); + } + + @SuppressWarnings("unchecked") + protected T self() { + return (T) this; + } + + protected HttpURLConnection createConnection(String method, boolean isSSE) throws IOException { + URL urlObj; + try { + urlObj = new URI(url).toURL(); + } catch (URISyntaxException e) { + throw new MalformedURLException("Invalid URL: " + url); + } + HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection(); + connection.setRequestMethod(method); + connection.setConnectTimeout(15000); // 15 seconds + connection.setReadTimeout(60000); // 60 seconds + for (Map.Entry header : headers.entrySet()) { + connection.setRequestProperty(header.getKey(), header.getValue()); + } + if (isSSE) { + connection.setRequestProperty("Accept", "text/event-stream"); + } + return connection; + } + + protected static String readStreamWithLimit(InputStream is) throws IOException { + if (is == null) { + return ""; + } + int maxResponseSize = 10 * 1024 * 1024; // 10 MB + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + StringBuilder sb = new StringBuilder(); + String line; + boolean first = true; + while ((line = reader.readLine()) != null) { + if (sb.length() + line.length() > maxResponseSize) { + throw new IOException("Response size exceeds limit"); + } + if (!first) { + sb.append('\n'); + } + sb.append(line); + first = false; + } + return sb.toString(); + } + } + + protected A2AHttpResponse execute(HttpURLConnection connection) throws IOException { + int status = connection.getResponseCode(); + String body = ""; + try (InputStream is = + (status >= HTTP_OK && status < HTTP_MULT_CHOICE) + ? connection.getInputStream() + : connection.getErrorStream()) { + body = readStreamWithLimit(is); + } + + if (status == HTTP_UNAUTHORIZED) { + throw new IOException(A2AErrorMessages.AUTHENTICATION_FAILED); + } else if (status == HTTP_FORBIDDEN) { + throw new IOException(A2AErrorMessages.AUTHORIZATION_FAILED); + } + + return new AndroidHttpResponse(status, body); + } + + protected void processSSEResponse( + HttpURLConnection connection, + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) { + try { + int status = connection.getResponseCode(); + if (status != HTTP_OK) { + if (status == HTTP_UNAUTHORIZED) { + errorConsumer.accept(new IOException(A2AErrorMessages.AUTHENTICATION_FAILED)); + return; + } else if (status == HTTP_FORBIDDEN) { + errorConsumer.accept(new IOException(A2AErrorMessages.AUTHORIZATION_FAILED)); + return; + } + + String errorBody = ""; + try (InputStream es = connection.getErrorStream()) { + errorBody = readStreamWithLimit(es); + } + errorConsumer.accept( + new IOException("Request failed with status " + status + ":" + errorBody)); + return; + } + + try (InputStream is = connection.getInputStream(); + BufferedReader reader = + new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + if (line.startsWith("data:")) { + String data = line.substring(5).trim(); + if (!data.isEmpty()) { + messageConsumer.accept(data); + } + } + } + completeRunnable.run(); + } + } catch (Exception e) { + errorConsumer.accept(e); + } finally { + connection.disconnect(); + } + } + + protected CompletableFuture executeAsyncSSE( + HttpURLConnection connection, + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) { + return CompletableFuture.runAsync( + () -> processSSEResponse(connection, messageConsumer, errorConsumer, completeRunnable), + NET_EXECUTOR); + } + } + + private static class AndroidGetBuilder extends AndroidBuilder implements GetBuilder { + @Override + public A2AHttpResponse get() throws IOException { + HttpURLConnection connection = createConnection("GET", false); + try { + return execute(connection); + } catch (IOException e) { + connection.disconnect(); + throw e; + } + } + + @Override + public CompletableFuture getAsyncSSE( + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) + throws IOException { + HttpURLConnection connection = createConnection("GET", true); + return executeAsyncSSE(connection, messageConsumer, errorConsumer, completeRunnable); + } + } + + private static class AndroidPostBuilder extends AndroidBuilder + implements PostBuilder { + private String body = ""; + + @Override + public PostBuilder body(String body) { + this.body = body; + return this; + } + + @Override + public A2AHttpResponse post() throws IOException { + HttpURLConnection connection = createConnection("POST", false); + connection.setDoOutput(true); + try { + try (OutputStream os = connection.getOutputStream()) { + os.write(body.getBytes(StandardCharsets.UTF_8)); + } + return execute(connection); + } catch (IOException e) { + connection.disconnect(); + throw e; + } + } + + @Override + public CompletableFuture postAsyncSSE( + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) + throws IOException { + HttpURLConnection connection = createConnection("POST", true); + connection.setDoOutput(true); + + return CompletableFuture.runAsync( + () -> { + try { + try (OutputStream os = connection.getOutputStream()) { + os.write(body.getBytes(StandardCharsets.UTF_8)); + } + processSSEResponse(connection, messageConsumer, errorConsumer, completeRunnable); + } catch (Exception e) { + errorConsumer.accept(e); + } + }, NET_EXECUTOR); + } + } + + private static class AndroidDeleteBuilder extends AndroidBuilder + implements DeleteBuilder { + @Override + public A2AHttpResponse delete() throws IOException { + HttpURLConnection connection = createConnection("DELETE", false); + try { + return execute(connection); + } catch (IOException e) { + connection.disconnect(); + throw e; + } + } + } + + private record AndroidHttpResponse(int status, String body) implements A2AHttpResponse { + @Override + public boolean success() { + return status >= HTTP_OK && status < HTTP_MULT_CHOICE; + } + } +} diff --git a/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClientProvider.java b/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClientProvider.java new file mode 100644 index 000000000..1a0f8d372 --- /dev/null +++ b/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClientProvider.java @@ -0,0 +1,22 @@ +package io.a2a.client.http; + +/** + * Service provider for {@link AndroidA2AHttpClient}. + */ +public final class AndroidA2AHttpClientProvider implements A2AHttpClientProvider { + + @Override + public A2AHttpClient create() { + return new AndroidA2AHttpClient(); + } + + @Override + public int priority() { + return 100; // Higher priority than JDK + } + + @Override + public String name() { + return "android"; + } +} diff --git a/extras/http-client-android/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider b/extras/http-client-android/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider new file mode 100644 index 000000000..7829103c4 --- /dev/null +++ b/extras/http-client-android/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider @@ -0,0 +1 @@ +io.a2a.client.http.AndroidA2AHttpClientProvider diff --git a/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientFactoryTest.java b/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientFactoryTest.java new file mode 100644 index 000000000..afa39584f --- /dev/null +++ b/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientFactoryTest.java @@ -0,0 +1,41 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import org.junit.jupiter.api.Test; + +public class AndroidA2AHttpClientFactoryTest { + + @Test + public void testCreateReturnsAndroidClient() { + // When both JDK and Android are on classpath, Android should be preferred due to higher priority (100) + A2AHttpClient client = A2AHttpClientFactory.create(); + assertNotNull(client); + assertInstanceOf(AndroidA2AHttpClient.class, client, + "Factory should return AndroidA2AHttpClient when Android provider is available"); + } + + @Test + public void testCreateWithAndroidProviderName() { + A2AHttpClient client = A2AHttpClientFactory.create("android"); + assertNotNull(client); + assertInstanceOf(AndroidA2AHttpClient.class, client, + "Factory should return AndroidA2AHttpClient when 'android' provider is requested"); + } + + @Test + public void testAndroidClientIsUsable() { + A2AHttpClient client = A2AHttpClientFactory.create("android"); + assertNotNull(client); + + // Verify we can create builders + A2AHttpClient.GetBuilder getBuilder = client.createGet(); + assertNotNull(getBuilder, "Should be able to create GET builder"); + + A2AHttpClient.PostBuilder postBuilder = client.createPost(); + assertNotNull(postBuilder, "Should be able to create POST builder"); + + A2AHttpClient.DeleteBuilder deleteBuilder = client.createDelete(); + assertNotNull(deleteBuilder, "Should be able to create DELETE builder"); + } +} diff --git a/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientIntegrationTest.java b/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientIntegrationTest.java new file mode 100644 index 000000000..b5a7dac85 --- /dev/null +++ b/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientIntegrationTest.java @@ -0,0 +1,9 @@ +package io.a2a.client.http; + +public class AndroidA2AHttpClientIntegrationTest extends AbstractA2AHttpClientIntegrationTest { + + @Override + protected A2AHttpClient createClient() { + return new AndroidA2AHttpClient(); + } +} diff --git a/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientSSETest.java b/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientSSETest.java new file mode 100644 index 000000000..cd842f9c4 --- /dev/null +++ b/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientSSETest.java @@ -0,0 +1,9 @@ +package io.a2a.client.http; + +public class AndroidA2AHttpClientSSETest extends AbstractA2AHttpClientSSETest { + + @Override + protected A2AHttpClient createClient() { + return new AndroidA2AHttpClient(); + } +} diff --git a/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientTest.java b/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientTest.java new file mode 100644 index 000000000..e611fa541 --- /dev/null +++ b/extras/http-client-android/src/test/java/io/a2a/client/http/AndroidA2AHttpClientTest.java @@ -0,0 +1,59 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import org.junit.jupiter.api.Test; + +public class AndroidA2AHttpClientTest { + + @Test + public void testNoArgsConstructor() { + AndroidA2AHttpClient client = new AndroidA2AHttpClient(); + assertNotNull(client); + } + + @Test + public void testCreateGet() { + AndroidA2AHttpClient client = new AndroidA2AHttpClient(); + A2AHttpClient.GetBuilder builder = client.createGet(); + assertNotNull(builder); + } + + @Test + public void testCreatePost() { + AndroidA2AHttpClient client = new AndroidA2AHttpClient(); + A2AHttpClient.PostBuilder builder = client.createPost(); + assertNotNull(builder); + } + + @Test + public void testCreateDelete() { + AndroidA2AHttpClient client = new AndroidA2AHttpClient(); + A2AHttpClient.DeleteBuilder builder = client.createDelete(); + assertNotNull(builder); + } + + @Test + public void testBuilderUrlSetting() { + AndroidA2AHttpClient client = new AndroidA2AHttpClient(); + A2AHttpClient.GetBuilder builder = client.createGet(); + A2AHttpClient.GetBuilder result = builder.url("https://example.com"); + assertSame(builder, result, "Builder should return itself for method chaining"); + } + + @Test + public void testBuilderHeaderSetting() { + AndroidA2AHttpClient client = new AndroidA2AHttpClient(); + A2AHttpClient.GetBuilder builder = client.createGet(); + A2AHttpClient.GetBuilder result = builder.addHeader("Accept", "application/json"); + assertSame(builder, result, "Builder should return itself for method chaining"); + } + + @Test + public void testPostBuilderBody() { + AndroidA2AHttpClient client = new AndroidA2AHttpClient(); + A2AHttpClient.PostBuilder builder = client.createPost(); + A2AHttpClient.PostBuilder result = builder.body("{\"key\":\"value\"}"); + assertSame(builder, result, "Builder should return itself for method chaining"); + } +} diff --git a/extras/http-client-android/src/test/java/io/a2a/client/http/JdkA2AHttpClientIntegrationTest.java b/extras/http-client-android/src/test/java/io/a2a/client/http/JdkA2AHttpClientIntegrationTest.java new file mode 100644 index 000000000..ed68795ea --- /dev/null +++ b/extras/http-client-android/src/test/java/io/a2a/client/http/JdkA2AHttpClientIntegrationTest.java @@ -0,0 +1,9 @@ +package io.a2a.client.http; + +public class JdkA2AHttpClientIntegrationTest extends AbstractA2AHttpClientIntegrationTest { + + @Override + protected A2AHttpClient createClient() { + return new JdkA2AHttpClient(); + } +} diff --git a/extras/http-client-android/src/test/java/io/a2a/client/http/JdkA2AHttpClientSSETest.java b/extras/http-client-android/src/test/java/io/a2a/client/http/JdkA2AHttpClientSSETest.java new file mode 100644 index 000000000..19dfd5f84 --- /dev/null +++ b/extras/http-client-android/src/test/java/io/a2a/client/http/JdkA2AHttpClientSSETest.java @@ -0,0 +1,9 @@ +package io.a2a.client.http; + +public class JdkA2AHttpClientSSETest extends AbstractA2AHttpClientSSETest { + + @Override + protected A2AHttpClient createClient() { + return new JdkA2AHttpClient(); + } +} diff --git a/http-client/pom.xml b/http-client/pom.xml index 9b7567af3..7909ba46f 100644 --- a/http-client/pom.xml +++ b/http-client/pom.xml @@ -35,4 +35,24 @@ + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + **/Abstract*.class + + + + + + + \ No newline at end of file diff --git a/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java b/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java index 22af7c615..4d4449e13 100644 --- a/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java +++ b/http-client/src/main/java/io/a2a/client/http/A2ACardResolver.java @@ -21,20 +21,23 @@ public class A2ACardResolver { /** * Get the agent card for an A2A agent. - * The {@code JdkA2AHttpClient} will be used to fetch the agent card. + * The {@link A2AHttpClientFactory#create()} will be used to fetch the agent + * card if available. * - * @param baseUrl the base URL for the agent whose agent card we want to retrieve + * @param baseUrl the base URL for the agent whose agent card we want to + * retrieve * @throws A2AClientError if the URL for the agent is invalid */ public A2ACardResolver(String baseUrl) throws A2AClientError { - this(new JdkA2AHttpClient(), baseUrl, null, null); + this(A2AHttpClientFactory.create(), baseUrl, null, null); } /** * Constructs an A2ACardResolver with a specific HTTP client and base URL. * * @param httpClient the http client to use - * @param baseUrl the base URL for the agent whose agent card we want to retrieve + * @param baseUrl the base URL for the agent whose agent card we want to + * retrieve * @throws A2AClientError if the URL for the agent is invalid */ public A2ACardResolver(A2AHttpClient httpClient, String baseUrl) throws A2AClientError { @@ -42,10 +45,12 @@ public A2ACardResolver(A2AHttpClient httpClient, String baseUrl) throws A2AClien } /** - * @param httpClient the http client to use - * @param baseUrl the base URL for the agent whose agent card we want to retrieve - * @param agentCardPath optional path to the agent card endpoint relative to the base - * agent URL, defaults to ".well-known/agent-card.json" + * @param httpClient the http client to use + * @param baseUrl the base URL for the agent whose agent card we want to + * retrieve + * @param agentCardPath optional path to the agent card endpoint relative to the + * base + * agent URL, defaults to ".well-known/agent-card.json" * @throws A2AClientError if the URL for the agent is invalid */ public A2ACardResolver(A2AHttpClient httpClient, String baseUrl, String agentCardPath) throws A2AClientError { @@ -53,17 +58,21 @@ public A2ACardResolver(A2AHttpClient httpClient, String baseUrl, String agentCar } /** - * @param httpClient the http client to use - * @param baseUrl the base URL for the agent whose agent card we want to retrieve - * @param agentCardPath optional path to the agent card endpoint relative to the base - * agent URL, defaults to ".well-known/agent-card.json" - * @param authHeaders the HTTP authentication headers to use. May be {@code null} + * @param httpClient the http client to use + * @param baseUrl the base URL for the agent whose agent card we want to + * retrieve + * @param agentCardPath optional path to the agent card endpoint relative to the + * base + * agent URL, defaults to ".well-known/agent-card.json" + * @param authHeaders the HTTP authentication headers to use. May be + * {@code null} * @throws A2AClientError if the URL for the agent is invalid */ public A2ACardResolver(A2AHttpClient httpClient, String baseUrl, @Nullable String agentCardPath, - @Nullable Map authHeaders) throws A2AClientError { + @Nullable Map authHeaders) throws A2AClientError { this.httpClient = httpClient; - String effectiveAgentCardPath = agentCardPath == null || agentCardPath.isEmpty() ? DEFAULT_AGENT_CARD_PATH : agentCardPath; + String effectiveAgentCardPath = agentCardPath == null || agentCardPath.isEmpty() ? DEFAULT_AGENT_CARD_PATH + : agentCardPath; try { this.url = new URI(baseUrl).resolve(effectiveAgentCardPath).toString(); } catch (URISyntaxException e) { @@ -76,8 +85,9 @@ public A2ACardResolver(A2AHttpClient httpClient, String baseUrl, @Nullable Strin * Get the agent card for the configured A2A agent. * * @return the agent card - * @throws A2AClientError If an HTTP error occurs fetching the card - * @throws A2AClientJSONError If the response body cannot be decoded as JSON or validated against the AgentCard schema + * @throws A2AClientError If an HTTP error occurs fetching the card + * @throws A2AClientJSONError If the response body cannot be decoded as JSON or + * validated against the AgentCard schema */ public AgentCard getAgentCard() throws A2AClientError, A2AClientJSONError { A2AHttpClient.GetBuilder builder = httpClient.createGet() @@ -109,5 +119,4 @@ public AgentCard getAgentCard() throws A2AClientError, A2AClientJSONError { } - } diff --git a/http-client/src/main/java/io/a2a/client/http/A2AHttpClientFactory.java b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientFactory.java new file mode 100644 index 000000000..0ea791b6e --- /dev/null +++ b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientFactory.java @@ -0,0 +1,52 @@ +package io.a2a.client.http; + +import java.util.Comparator; +import java.util.List; +import java.util.ServiceLoader; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * Factory for creating {@link A2AHttpClient} instances using the ServiceLoader mechanism. + */ +public final class A2AHttpClientFactory { + + private static final List PROVIDERS; + + static { + ServiceLoader loader = ServiceLoader.load(A2AHttpClientProvider.class); + PROVIDERS = StreamSupport.stream(loader.spliterator(), false) + .collect(Collectors.toList()); + } + + private A2AHttpClientFactory() { + // Utility class + } + + /** + * Creates a new A2AHttpClient instance using the highest priority provider available. + * If no providers are found, it throws an {@link IllegalStateException}. + */ + public static A2AHttpClient create() { + return PROVIDERS.stream() + .max(Comparator.comparingInt(A2AHttpClientProvider::priority)) + .map(A2AHttpClientProvider::create) + .orElseThrow(() -> new IllegalStateException("No A2AHttpClientProvider found")); + } + + /** + * Creates a new A2AHttpClient instance using a specific provider by name. + */ + public static A2AHttpClient create(String providerName) { + if (providerName == null || providerName.isEmpty()) { + throw new IllegalArgumentException("Provider name must not be null or empty"); + } + + return PROVIDERS.stream() + .filter(provider -> providerName.equals(provider.name())) + .findFirst() + .map(A2AHttpClientProvider::create) + .orElseThrow(() -> new IllegalArgumentException( + "No A2AHttpClientProvider found with name: " + providerName)); + } +} diff --git a/http-client/src/main/java/io/a2a/client/http/A2AHttpClientProvider.java b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientProvider.java new file mode 100644 index 000000000..0ededf4b6 --- /dev/null +++ b/http-client/src/main/java/io/a2a/client/http/A2AHttpClientProvider.java @@ -0,0 +1,23 @@ +package io.a2a.client.http; + +/** + * Provider interface for creating {@link A2AHttpClient} instances. + */ +public interface A2AHttpClientProvider { + /** + * Creates a new A2AHttpClient instance. + */ + A2AHttpClient create(); + + /** + * Returns the priority of this provider. Higher priority providers are preferred. + */ + default int priority() { + return 0; + } + + /** + * Returns the name of this provider. + */ + String name(); +} diff --git a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java index 9b8003741..51b8f1d85 100644 --- a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java +++ b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java @@ -91,6 +91,14 @@ protected HttpRequest.Builder createRequestBuilder() throws IOException { return builder; } + protected void checkAuthErrors(HttpResponse response) throws IOException { + if (response.statusCode() == HTTP_UNAUTHORIZED) { + throw new IOException(A2AErrorMessages.AUTHENTICATION_FAILED); + } else if (response.statusCode() == HTTP_FORBIDDEN) { + throw new IOException(A2AErrorMessages.AUTHORIZATION_FAILED); + } + } + protected CompletableFuture asyncRequest( HttpRequest request, Consumer messageConsumer, @@ -210,6 +218,7 @@ public A2AHttpResponse get() throws IOException, InterruptedException { .build(); HttpResponse response = httpClient.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8)); + checkAuthErrors(response); return new JdkHttpResponse(response); } @@ -232,6 +241,7 @@ public A2AHttpResponse delete() throws IOException, InterruptedException { HttpRequest request = super.createRequestBuilder().DELETE().build(); HttpResponse response = httpClient.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8)); + checkAuthErrors(response); return new JdkHttpResponse(response); } @@ -263,11 +273,7 @@ public A2AHttpResponse post() throws IOException, InterruptedException { HttpResponse response = httpClient.send(request, BodyHandlers.ofString(StandardCharsets.UTF_8)); - if (response.statusCode() == HTTP_UNAUTHORIZED) { - throw new IOException(A2AErrorMessages.AUTHENTICATION_FAILED); - } else if (response.statusCode() == HTTP_FORBIDDEN) { - throw new IOException(A2AErrorMessages.AUTHORIZATION_FAILED); - } + checkAuthErrors(response); return new JdkHttpResponse(response); } diff --git a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClientProvider.java b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClientProvider.java new file mode 100644 index 000000000..b512e49ef --- /dev/null +++ b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClientProvider.java @@ -0,0 +1,22 @@ +package io.a2a.client.http; + +/** + * Service provider for {@link JdkA2AHttpClient}. + */ +public final class JdkA2AHttpClientProvider implements A2AHttpClientProvider { + + @Override + public A2AHttpClient create() { + return new JdkA2AHttpClient(); + } + + @Override + public int priority() { + return 0; // Lowest priority - fallback + } + + @Override + public String name() { + return "jdk"; + } +} diff --git a/http-client/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider b/http-client/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider new file mode 100644 index 000000000..78dbb361e --- /dev/null +++ b/http-client/src/main/resources/META-INF/services/io.a2a.client.http.A2AHttpClientProvider @@ -0,0 +1 @@ +io.a2a.client.http.JdkA2AHttpClientProvider diff --git a/http-client/src/test/java/io/a2a/client/http/A2ACardResolverTest.java b/http-client/src/test/java/io/a2a/client/http/A2ACardResolverTest.java index 9c2a177ec..3acad3b4f 100644 --- a/http-client/src/test/java/io/a2a/client/http/A2ACardResolverTest.java +++ b/http-client/src/test/java/io/a2a/client/http/A2ACardResolverTest.java @@ -174,4 +174,9 @@ public GetBuilder addHeaders(Map headers) { } } + @Test + public void testFactoryCreate() { + A2AHttpClient client = A2AHttpClientFactory.create(); + assertTrue(client instanceof JdkA2AHttpClient); + } } diff --git a/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientIntegrationTest.java b/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientIntegrationTest.java new file mode 100644 index 000000000..90cd899e8 --- /dev/null +++ b/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientIntegrationTest.java @@ -0,0 +1,216 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +import io.a2a.common.A2AErrorMessages; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; + +import java.io.IOException; + +public abstract class AbstractA2AHttpClientIntegrationTest { + + private ClientAndServer mockServer; + private A2AHttpClient client; + + protected abstract A2AHttpClient createClient(); + + @BeforeEach + public void setup() { + mockServer = ClientAndServer.startClientAndServer(0); + client = createClient(); + } + + @AfterEach + public void teardown() { + if (mockServer != null) { + mockServer.stop(); + } + } + + private String getBaseUrl() { + return "http://localhost:" + mockServer.getPort(); + } + + @Test + public void testGetRequestSuccess() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(200).withBody("success")); + + A2AHttpResponse response = client.createGet() + .url(getBaseUrl() + "/test") + .get(); + + assertEquals(200, response.status()); + assertTrue(response.success()); + assertEquals("success", response.body()); + } + + @Test + public void testPostRequestSuccess() throws Exception { + mockServer + .when(request() + .withMethod("POST") + .withPath("/test") + .withBody("{\"key\":\"value\"}")) + .respond(response().withStatusCode(201).withBody("created")); + + A2AHttpResponse response = client.createPost() + .url(getBaseUrl() + "/test") + .body("{\"key\":\"value\"}") + .post(); + + assertEquals(201, response.status()); + assertTrue(response.success()); + assertEquals("created", response.body()); + } + + @Test + public void testDeleteRequestSuccess() throws Exception { + mockServer + .when(request().withMethod("DELETE").withPath("/test")) + .respond(response().withStatusCode(204)); + + A2AHttpResponse response = client.createDelete() + .url(getBaseUrl() + "/test") + .delete(); + + assertEquals(204, response.status()); + assertTrue(response.success()); + } + + @Test + public void test401AuthenticationErrorOnGet() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(401)); + + Exception exception = assertThrows(IOException.class, () -> { + client.createGet() + .url(getBaseUrl() + "/test") + .get(); + }); + + assertEquals(A2AErrorMessages.AUTHENTICATION_FAILED, exception.getMessage()); + } + + @Test + public void test403AuthorizationErrorOnGet() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(403)); + + Exception exception = assertThrows(IOException.class, () -> { + client.createGet() + .url(getBaseUrl() + "/test") + .get(); + }); + + assertEquals(A2AErrorMessages.AUTHORIZATION_FAILED, exception.getMessage()); + } + + @Test + public void test401AuthenticationErrorOnPost() throws Exception { + mockServer + .when(request().withMethod("POST").withPath("/test")) + .respond(response().withStatusCode(401)); + + Exception exception = assertThrows(IOException.class, () -> { + client.createPost() + .url(getBaseUrl() + "/test") + .body("{}") + .post(); + }); + + assertEquals(A2AErrorMessages.AUTHENTICATION_FAILED, exception.getMessage()); + } + + @Test + public void test403AuthorizationErrorOnPost() throws Exception { + mockServer + .when(request().withMethod("POST").withPath("/test")) + .respond(response().withStatusCode(403)); + + Exception exception = assertThrows(IOException.class, () -> { + client.createPost() + .url(getBaseUrl() + "/test") + .body("{}") + .post(); + }); + + assertEquals(A2AErrorMessages.AUTHORIZATION_FAILED, exception.getMessage()); + } + + @Test + public void test401AuthenticationErrorOnDelete() throws Exception { + mockServer + .when(request().withMethod("DELETE").withPath("/test")) + .respond(response().withStatusCode(401)); + + Exception exception = assertThrows(IOException.class, () -> { + client.createDelete() + .url(getBaseUrl() + "/test") + .delete(); + }); + + assertEquals(A2AErrorMessages.AUTHENTICATION_FAILED, exception.getMessage()); + } + + @Test + public void testHeaderPropagation() throws Exception { + mockServer + .when(request() + .withMethod("GET") + .withPath("/test") + .withHeader("Authorization", "Bearer token") + .withHeader("X-Custom-Header", "custom-value")) + .respond(response().withStatusCode(200).withBody("ok")); + + A2AHttpResponse response = client.createGet() + .url(getBaseUrl() + "/test") + .addHeader("Authorization", "Bearer token") + .addHeader("X-Custom-Header", "custom-value") + .get(); + + assertEquals(200, response.status()); + assertEquals("ok", response.body()); + } + + @Test + public void testNonSuccessStatusCode() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(500).withBody("Internal Server Error")); + + A2AHttpResponse response = client.createGet() + .url(getBaseUrl() + "/test") + .get(); + + assertEquals(500, response.status()); + assertFalse(response.success()); + assertEquals("Internal Server Error", response.body()); + } + + @Test + public void test404NotFound() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/test")) + .respond(response().withStatusCode(404).withBody("Not Found")); + + A2AHttpResponse response = client.createGet() + .url(getBaseUrl() + "/test") + .get(); + + assertEquals(404, response.status()); + assertFalse(response.success()); + assertEquals("Not Found", response.body()); + } +} diff --git a/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientSSETest.java b/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientSSETest.java new file mode 100644 index 000000000..2b9a0f349 --- /dev/null +++ b/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientSSETest.java @@ -0,0 +1,255 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +import io.a2a.common.A2AErrorMessages; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractA2AHttpClientSSETest { + + private ClientAndServer mockServer; + private A2AHttpClient client; + + protected abstract A2AHttpClient createClient(); + + @BeforeEach + public void setup() { + mockServer = ClientAndServer.startClientAndServer(0); + client = createClient(); + } + + @AfterEach + public void teardown() { + if (mockServer != null) { + mockServer.stop(); + } + } + + private String getBaseUrl() { + return "http://localhost:" + mockServer.getPort(); + } + + @Test + public void testGetAsyncSSE() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: event1\n\ndata: event2\n\ndata: event3\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS), "Expected completion handler to be called"); + assertNull(error.get(), "Expected no errors"); + assertEquals(3, events.size(), "Expected to receive 3 events"); + assertTrue(events.contains("event1")); + assertTrue(events.contains("event2")); + assertTrue(events.contains("event3")); + } + + @Test + public void testPostAsyncSSE() throws Exception { + mockServer + .when(request() + .withMethod("POST") + .withPath("/sse") + .withBody("{\"subscribe\":true}")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: message1\n\ndata: message2\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createPost() + .url(getBaseUrl() + "/sse") + .body("{\"subscribe\":true}") + .postAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS), "Expected completion handler to be called"); + assertNull(error.get(), "Expected no errors"); + assertEquals(2, events.size(), "Expected to receive 2 events"); + assertTrue(events.contains("message1")); + assertTrue(events.contains("message2")); + } + + @Test + public void testSSEDataPrefixStripping() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: content here\n\ndata:no space\n\ndata: extra spaces \n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertTrue(events.contains("content here"), "Should have stripped 'data: ' prefix"); + assertTrue(events.contains("no space"), "Should handle 'data:' without space"); + assertTrue(events.contains("extra spaces"), "Should trim whitespace"); + } + + @Test + public void testSSEAuthenticationError() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response().withStatusCode(401)); + + CountDownLatch errorLatch = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + AtomicBoolean completed = new AtomicBoolean(false); + + client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + msg -> {}, + e -> { + error.set(e); + errorLatch.countDown(); + }, + () -> completed.set(true) + ); + + assertTrue(errorLatch.await(5, TimeUnit.SECONDS), "Expected error handler to be called"); + assertNotNull(error.get(), "Expected an error"); + assertTrue(error.get() instanceof IOException, "Expected IOException"); + assertTrue(error.get().getMessage().contains(A2AErrorMessages.AUTHENTICATION_FAILED), + "Expected authentication error message but got: " + error.get().getMessage()); + assertFalse(completed.get(), "Should not call completion handler on error"); + } + + @Test + public void testSSEAuthorizationError() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response().withStatusCode(403)); + + CountDownLatch errorLatch = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + AtomicBoolean completed = new AtomicBoolean(false); + + client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + msg -> {}, + e -> { + error.set(e); + errorLatch.countDown(); + }, + () -> completed.set(true) + ); + + assertTrue(errorLatch.await(5, TimeUnit.SECONDS), "Expected error handler to be called"); + assertNotNull(error.get(), "Expected an error"); + assertTrue(error.get() instanceof IOException, "Expected IOException"); + assertTrue(error.get().getMessage().contains(A2AErrorMessages.AUTHORIZATION_FAILED), + "Expected authorization error message but got: " + error.get().getMessage()); + assertFalse(completed.get(), "Should not call completion handler on error"); + } + + @Test + public void testSSEEmptyLinesIgnored() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: first\n\n\n\ndata: second\n\ndata: \n\ndata: third\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertEquals(3, events.size(), "Should have received 3 non-empty events"); + assertTrue(events.contains("first")); + assertTrue(events.contains("second")); + assertTrue(events.contains("third")); + } + + @Test + public void testSSEHeaderPropagation() throws Exception { + mockServer + .when(request() + .withMethod("GET") + .withPath("/sse") + .withHeader("Accept", "text/event-stream") + .withHeader("Authorization", "Bearer token")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: authenticated\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createGet() + .url(getBaseUrl() + "/sse") + .addHeader("Authorization", "Bearer token") + .getAsyncSSE( + events::add, + error::set, + latch::countDown + ); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertTrue(events.contains("authenticated")); + } +} diff --git a/http-client/src/test/java/io/a2a/client/http/JdkA2AHttpClientIntegrationTest.java b/http-client/src/test/java/io/a2a/client/http/JdkA2AHttpClientIntegrationTest.java new file mode 100644 index 000000000..ed68795ea --- /dev/null +++ b/http-client/src/test/java/io/a2a/client/http/JdkA2AHttpClientIntegrationTest.java @@ -0,0 +1,9 @@ +package io.a2a.client.http; + +public class JdkA2AHttpClientIntegrationTest extends AbstractA2AHttpClientIntegrationTest { + + @Override + protected A2AHttpClient createClient() { + return new JdkA2AHttpClient(); + } +} diff --git a/http-client/src/test/java/io/a2a/client/http/JdkA2AHttpClientSSETest.java b/http-client/src/test/java/io/a2a/client/http/JdkA2AHttpClientSSETest.java new file mode 100644 index 000000000..19dfd5f84 --- /dev/null +++ b/http-client/src/test/java/io/a2a/client/http/JdkA2AHttpClientSSETest.java @@ -0,0 +1,9 @@ +package io.a2a.client.http; + +public class JdkA2AHttpClientSSETest extends AbstractA2AHttpClientSSETest { + + @Override + protected A2AHttpClient createClient() { + return new JdkA2AHttpClient(); + } +} diff --git a/pom.xml b/pom.xml index 83cff974b..110e27ee7 100644 --- a/pom.xml +++ b/pom.xml @@ -288,6 +288,13 @@ test ${project.version} + + ${project.groupId} + a2a-java-sdk-http-client + test-jar + test + ${project.version} + org.jspecify jspecify @@ -454,6 +461,7 @@ extras/task-store-database-jpa extras/push-notification-config-store-database-jpa extras/queue-manager-replicated + extras/http-client-android http-client integrations/microprofile-config reference/common diff --git a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java index 9601e6b79..ca9092890 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java +++ b/server-common/src/main/java/io/a2a/server/tasks/BasePushNotificationSender.java @@ -10,11 +10,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import io.a2a.json.JsonProcessingException; - import io.a2a.client.http.A2AHttpClient; -import io.a2a.client.http.JdkA2AHttpClient; import io.a2a.json.JsonUtil; +import io.a2a.client.http.A2AHttpClientFactory; import io.a2a.spec.PushNotificationConfig; import io.a2a.spec.Task; @@ -31,8 +29,7 @@ public class BasePushNotificationSender implements PushNotificationSender { @Inject public BasePushNotificationSender(PushNotificationConfigStore configStore) { - this.httpClient = new JdkA2AHttpClient(); - this.configStore = configStore; + this(configStore, A2AHttpClientFactory.create()); } public BasePushNotificationSender(PushNotificationConfigStore configStore, A2AHttpClient httpClient) { @@ -56,11 +53,12 @@ public void sendNotification(Task task) { .allMatch(CompletableFuture::join)); try { boolean allSent = dispatchResult.get(); - if (! allSent) { + if (!allSent) { LOGGER.warn("Some push notifications failed to send for taskId: " + task.getId()); } } catch (InterruptedException | ExecutionException e) { - LOGGER.warn("Some push notifications failed to send for taskId " + task.getId() + ": {}", e.getMessage(), e); + LOGGER.warn("Some push notifications failed to send for taskId " + task.getId() + ": {}", e.getMessage(), + e); } }