Pulsar Ingress
Nstream provides a Pulsar Adapter library that greatly facilitates ingestion from Pulsar topics. This guide demonstrates how to consume from Pulsar topics and process responses in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-pulsar:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-pulsar</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
PulsarIngestingAgent:
An abstract Web Agent class with built-in methods relevant to aPulsarConsumer
poll cycle -
PulsarIngressSettings:
A plain old Java object (POJO) that configures a singlePulsarIngestingAgent
-
PulsarIngestingPatch:
A concrete (but extendable)PulsarIngestingAgent
subclass that collectsMessages
intoswim.structure.Values
PulsarIngestingPatch
Conditions
Ideal If:
- A message may be acknowledged as soon as it is consumed
-
ConsumerRecord
bodies are converted intoValues
during ingestion
Then you simply need to use the PulsarIngestingPatch
class, often without overriding any methods.
Let’s demonstrate this by configuring an agent to receive messages from a local Pulsar instance, wholly in server.recon
:
# server.recon
"provisions": {
@provision("pulsarClient") {
class: "nstream.adapter.pulsar.PulsarClientProvision",
def: {
"serviceUrl": "pulsar://localhost:6650",
"connectionsPerBroker": 1
}
},
@provision("consumerConf") {
class: "nstream.adapter.pulsar.PulsarConsumerConfProvision"
def: {
"topicNames": "myTopic",
"subscriptionName": "mySubscription"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
# PulsarIngestingAgent replacement
@node {
uri: "/pulsar"
@agent(class: "nstream.adapter.pulsar.PulsarIngestingPatch") {
pulsarIngressConf: @pulsarIngressSettings {
clientProvisionName: "pulsarClient"
consumerConfProvisionName: "consumerConf"
contentTypeOverride: "json"
relaySchema: @command {
nodeUri: "/vehicle/$value.id"
laneUri: "addMessage"
value: $value
}
}
}
}
}
# Configure desired web settings (e.g. port)
# ...
Configuration
Client and Consumer
As a first step, a singleton PulsarClientProvision
must be configured so a consumer may be instantiated against it.
The above snippet is minimal, and an exhaustive set of config options may be found here.
The consumer configuration is handled the same way.
PulsarIngressSettings
The previous configurations can be seen as “connection-oriented”. This part is “ingestion-oriented”, responsible for both:
- Linking the client and consumer together
- Dictating what to do given a message
Common Variations
Receiving Complex Data Types
Custom Ingest Logic
If relaying messages to another agent is not sufficient, or you would like to add custom logic on ingestion of messages, the PulsarIngestingPatch
can be easily overridden.
Create a new class that extends the PulsarIngestingPatch
and override the ingest
method.
// CustomIngestingAgent.java
public class CustomIngestingAgent<V> extends PulsarIngestingPatch<V> {
@Override
protected void ingest(Message<V> unstructured) throws DeferrableException {
// Custom ingestion of an individual message
}
}
Remember to update your server.recon
file with the new agent you just created, instead of the PulsarIngestingPatch
.
Manual Acknowledgements
By default, messages are acknowledged as soon as they are consumed, without waiting upon the results of any processing by downstream Web Agents.
Future nstream-toolkit
releases will introduce some convenient abstractions over the most common patterns that require finer control, so stay tuned!
In the meantime, please feel free to share any of your ideas.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).