Event-driven, asynchronous JSON message processors for Apache HttpComponents 5.0
Apache HttpComponents do not directly provide any support for specific message formats such as XML or JSON.
While it is relatively easy to process HTTP message content with any data processing libraries
that support the classic (blocking) I/O model based on InputStream
and OutputStream
APIs,
there is no such common API for asynchronous data processing in Java 8 or earlier.
It is a very common anti-pattern with asynchronous HTTP content processing when message body content
gets buffered in memory and then processed using standard blocking InputStream
and OutputStream
APIs.
This library aims at making asynchronous processing of JSON messages with Apache HttpCore 5.0 and Apache HttpClient 5.0 simple and convenient while eliminating intermediate content buffering in memory. The library uses the asynchronous JSON message parser of the fantastic Jackson JSON processor to tokenize JSON content and map onto a higher level Java object model.
The library is available in Maven Central with the following coordinates:
<dependency>
<groupId>com.github.ok2c.hc5</groupId>
<artifactId>hc5-async-json</artifactId>
<version>0.3.0</version>
</dependency>
HTTP response message consumers
There are several AsyncResponseConsumer
implementations provided by the library that can be
created using JsonResponseConsumers
factory class.
-
Response data consumers that convert HTTP messages with enclosed JSON content into an object or multiple objects of the same class using Jackson’s
ObjectMapper
- Single data or
JsonNode
object
CloseableHttpAsyncClient client = HttpAsyncClients.createSystem(); client.start(); URI uri = URI.create("http://httpbin.org/get"); JsonFactory factory = new JsonFactory(); ObjectMapper objectMapper = new ObjectMapper(factory); System.out.println("Executing GET " + uri); Future<?> future = client.execute( AsyncRequestBuilder.get(uri).build(), JsonResponseConsumers.create(objectMapper, RequestData.class), new FutureCallback<Message<HttpResponse, RequestData>>() { @Override public void completed(Message<HttpResponse, RequestData> message) { System.out.println("Response status: " + message.getHead().getCode()); System.out.println(message.getBody()); } @Override public void failed(Exception ex) { ex.printStackTrace(System.out); } @Override public void cancelled() { } }); future.get(); client.shutdown(CloseMode.GRACEFUL);
Stdout>
Executing GET http://httpbin.org/get Response status: 200 RequestData{id=0, url=http://httpbin.org/get, origin='xxx.xxx.xxx.xxx', headers={Connection=close, Host=httpbin.org, User-Agent=Apache-HttpAsyncClient/5.0-beta3 (Java/1.8.0_181)}, args={}}
- Sequence of data objects of the same class
CloseableHttpAsyncClient client = HttpAsyncClients.createSystem(); client.start(); URI uri = URI.create("http://httpbin.org/stream/5"); JsonFactory factory = new JsonFactory(); ObjectMapper objectMapper = new ObjectMapper(factory); System.out.println("Executing GET " + uri); Future<?> future = client.execute( AsyncRequestBuilder.get(uri).build(), JsonResponseConsumers.create( objectMapper, RequestData.class, messageHead -> System.out.println("Response status: " + messageHead.getCode()), requestData -> System.out.println(requestData)), new FutureCallback<Long>() { @Override public void completed(Long count) { System.out.println("Objects received: " + count); } @Override public void failed(Exception ex) { ex.printStackTrace(System.out); } @Override public void cancelled() { } }); future.get(); client.shutdown(CloseMode.GRACEFUL);
Stdout>
Executing GET http://httpbin.org/stream/5 Response status: 200 RequestData{id=0, url=http://httpbin.org/stream/5, origin='xxx.xxx.xxx.xxx', headers={Host=httpbin.org, Connection=close, User-Agent=Apache-HttpAsyncClient/5.0-beta3 (Java/1.8.0_181)}, args={}} RequestData{id=1, url=http://httpbin.org/stream/5, origin='xxx.xxx.xxx.xxx', headers={Host=httpbin.org, Connection=close, User-Agent=Apache-HttpAsyncClient/5.0-beta3 (Java/1.8.0_181)}, args={}} RequestData{id=2, url=http://httpbin.org/stream/5, origin='xxx.xxx.xxx.xxx', headers={Host=httpbin.org, Connection=close, User-Agent=Apache-HttpAsyncClient/5.0-beta3 (Java/1.8.0_181)}, args={}} RequestData{id=3, url=http://httpbin.org/stream/5, origin='xxx.xxx.xxx.xxx', headers={Host=httpbin.org, Connection=close, User-Agent=Apache-HttpAsyncClient/5.0-beta3 (Java/1.8.0_181)}, args={}} RequestData{id=4, url=http://httpbin.org/stream/5, origin='xxx.xxx.xxx.xxx', headers={Host=httpbin.org, Connection=close, User-Agent=Apache-HttpAsyncClient/5.0-beta3 (Java/1.8.0_181)}, args={}} Objects received: 5
- Single data or
-
Request data consumer that converts JSON messages a sequence of token evens. This is the most efficient way of processing JSON content supported by the library. It enables partial consumption of JSON structure without creating a full representation of incoming messages.
CloseableHttpAsyncClient client = HttpAsyncClients.createSystem(); client.start(); URI uri = URI.create("http://httpbin.org/get"); JsonFactory factory = new JsonFactory(); System.out.println("Executing GET " + uri); Future<?> future = client.execute( AsyncRequestBuilder.get(uri).build(), JsonResponseConsumers.create( factory, messageHead -> System.out.println("Response status: " + messageHead.getCode()), new JsonTokenEventHandler() { @Override public void objectStart() { System.out.print("object start/"); } @Override public void objectEnd() { System.out.print("object end/"); } @Override public void arrayStart() { System.out.print("array start/"); } @Override public void arrayEnd() { System.out.print("array end/"); } @Override public void field(String name) { System.out.print(name + "="); } @Override public void embeddedObject(Object object) { System.out.print(object + "/"); } @Override public void value(String value) { System.out.print("\"" + value + "\"/"); } @Override public void value(int value) { System.out.print(value + "/"); } @Override public void value(double value) { System.out.print(value + "/"); } @Override public void value(boolean value) { System.out.print(value + "/"); } @Override public void valueNull() { System.out.print("null/"); } @Override public void endOfStream() { System.out.println("stream end/"); } }), new FutureCallback<Void>() { @Override public void completed(Void input) { System.out.println("Object received"); } @Override public void failed(Exception ex) { ex.printStackTrace(System.out); } @Override public void cancelled() { } }); future.get(); client.shutdown(CloseMode.GRACEFUL);
Stdout>
Response status: 200 object start/args=object start/object end/headers=object start/Connection="close"/Host="httpbin.org"/User-Agent="Apache-HttpAsyncClient/5.0-beta3 (Java/1.8.0_181)"/object end/origin="xxx.xxx.xxx.xxx"/url="http://httpbin.org/get"/object end/stream end/ Object received
HTTP request message producers
There are several AsyncRequestProducer
implementations provided by the library that can be
created using JsonRequestProducers
factory class.
-
Request producers that generate HTTP request messages with enclosed JSON content from a single JSON object or multiple JSON objects of the same class using Jackson’s
ObjectMapper
- Single data or
JsonNode
object
CloseableHttpAsyncClient client = HttpAsyncClients.createSystem(); client.start(); URI uri = URI.create("http://httpbin.org/post"); JsonFactory factory = new JsonFactory(); ObjectMapper objectMapper = new ObjectMapper(factory); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); System.out.println("Executing POST " + uri); Future<?> future = client.execute( JsonRequestProducers.create(HttpRequests.POST.create(uri), new BasicNameValuePair("name", "value"), objectMapper), JsonResponseConsumers.create(objectMapper, RequestData.class), new FutureCallback<Message<HttpResponse, RequestData>>() { @Override public void completed(Message<HttpResponse, RequestData> message) { System.out.println("Response status: " + message.getHead().getCode()); System.out.println(message.getBody()); } @Override public void failed(Exception ex) { ex.printStackTrace(System.out); } @Override public void cancelled() { } }); future.get(); client.shutdown(CloseMode.GRACEFUL);
Stdout>
Executing POST http://httpbin.org/post Response status: 200 RequestData{id=0, url=http://httpbin.org/post, origin='xxx.xxx.xxx.xxx', headers={Connection=close, Content-Type=application/json; charset=UTF-8, Host=httpbin.org, Transfer-Encoding=chunked, User-Agent=Apache-HttpAsyncClient/5.0-beta3 (Java/1.8.0_181)}, args={}, data='{"name":"name","value":"value"}', json={"name":"name","value":"value"}}
- Sequence of data objects of the same class
The
ObjectChannel
passed to the message producer is fully threading safe and can be used concurrently by worker threads to generate and submit JSON objects to the channel asynchronously as more data become available.CloseableHttpAsyncClient client = HttpAsyncClients.createSystem(); client.start(); URI uri = URI.create("http://httpbin.org/post"); JsonFactory factory = new JsonFactory(); ObjectMapper objectMapper = new ObjectMapper(factory); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); System.out.println("Executing POST " + uri); Future<?> future = client.execute( JsonRequestProducers.create(HttpRequests.POST.create(uri), objectMapper, channel -> { channel.write(new BasicNameValuePair("name1", "value1")); channel.write(new BasicNameValuePair("name2", "value2")); channel.write(new BasicNameValuePair("name3", "value3")); channel.endStream(); }), JsonResponseConsumers.create(objectMapper, RequestData.class), new FutureCallback<Message<HttpResponse, RequestData>>() { @Override public void completed(Message<HttpResponse, RequestData> message) { System.out.println("Response status: " + message.getHead().getCode()); System.out.println(message.getBody()); } @Override public void failed(Exception ex) { ex.printStackTrace(System.out); } @Override public void cancelled() { } }); future.get(); client.shutdown(CloseMode.GRACEFUL);
Stdout>
Response status: 200 Shutting down RequestData{id=0, url=http://httpbin.org/post, origin='xxx.xxx.xxx.xxx', headers={Connection=close, Content-Type=application/json; charset=UTF-8, Host=httpbin.org, Transfer-Encoding=chunked, User-Agent=Apache-HttpAsyncClient/5.0-beta3 (Java/1.8.0_181)}, args={}, data='{"name":"name1","value":"value1"}{"name":"name2","value":"value2"}{"name":"name3","value":"value3"}', json=null}
- Single data or
General purpose event driven bulk JSON readers
JsonBulkArrayReader
is a general-purpose event driven JSOM reader that can be used to read large arrays
of objects while buffering only a single array element in memory. JsonBulkArrayReader
has no dependency on
HttpComponents APIs.
JsonFactory factory = new JsonFactory();
ObjectMapper objectMapper = new ObjectMapper(factory);
URL resource = JsonBulkArrayReaderExample.class.getResource("/sample6.json");
JsonBulkArrayReader bulkArrayReader = new JsonBulkArrayReader(objectMapper);
bulkArrayReader.initialize(new TypeReference<RequestData>() {}, entry -> System.out.println(entry));
try (InputStream inputStream = resource.openStream()) {
int l;
byte[] tmp = new byte[4096];
while ((l = inputStream.read(tmp)) != -1) {
bulkArrayReader.consume(ByteBuffer.wrap(tmp, 0, l));
}
bulkArrayReader.streamEnd();
}