Google Pub/Sub Ingress
Nstream provides a robust Google Pub/Sub Adapter library designed to facilitate the ingestion of data from Google Pub/Sub topics directly into Web Agents with minimal configuration required. This adapter simplifies integration, allowing developers to focus on data processing rather than complex ingestion mechanisms.
Prerequisites
Dependencies
To use the Google Pub/Sub Adapter, include the following dependency in your project:
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-pubsub</artifactId>
<version>4.13.21</version>
</dependency>
Alternatively, you can manage dependencies using your preferred build tool, such as Maven
or Gradle
.
Glossary
- Ingestion: The process of receiving and transforming messages from Google Pub/Sub into forms usable by Web Agents.
- PubSubIngestingAgent: An abstract Web Agent class with methods specific to managing the ingestion of messages from Google Pub/Sub subscriptions.
-
PubSubIngressSettings: A Plain Old Java Object (POJO) that holds configuration details for a
PubSubIngestingAgent
, facilitating customization of message fetching and handling. -
PubSubIngestingPatch: A concrete extension of PubSubIngestingAgent that processes and transforms received messages into
swim.structure.Value
objects.
Supported Content Types
The adapter supports ingestion of messages in various formats, such as:
- String: Plain text messages.
- JSON: Standard JSON parsing.
- Avro: Schema-based message decoding via Avro.
- Protobuf: Protobuf-based message handling using dynamic descriptors.
- Binary (octet-stream): Raw binary or base64-encoded messages.
Configuration Overview
Here’s how to configure an agent to receive and process messages from Google Pub/Sub, as depicted in the server.recon
setup:
Note
:
- To configure a specific content type, use the
contentTypeOverride
parameter in yourserver.recon
file. - It is recommended to create a separate dedicated subscription for each specific content type to ensure proper message handling.
- Supported
contentTypeOverride
values:json
,avro
,protobuf
,octet-stream
and the default string. - For
protobuf
, theprotoFilePath
parameter is required.
# server.recon
"provisions": {
@provision("pubsub-subscriber-provision") {
class: "nstream.adapter.common.provision.PropertiesProvision",
def: {
"projectId": "your-google-cloud-project-id",
"topicName": "your-pubsub-topic-name",
"subscriptionId": "your-pubsub-subscription-id",
"subscriberProvisionName": "pubsub-subscriber-provision"
}
}
}
"pubsub-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/ingesting"
@agent(class: "nstream.adapter.pubsub.PubSubIngestingPatch") {
pubsubIngressConf: @pubsubIngressSettings {
firstFetchDelayMillis: 5000,
fetchIntervalMillis: 20000,
subscriberProvisionName: "pubsub-subscriber-provision",
contentTypeOverride: json
relaySchema: @command {
nodeUri: "/dynamic/$val+1"
laneUri: "unused"
value:
}
}
}
}
}
# Configure desired web settings (e.g. port)
# ...
PubSubIngressSettings
Note
: Parameters projectId
, topicName
, and subscriptionId
are defined within the provision configuration.
- projectId: Identifies the Google Cloud project managing the Pub/Sub resources.
- topicName: The Pub/Sub topic from which messages are published.
- subscriptionId: Refers to the subscription used to receive messages.
These settings dictate how messages are fetched and processed:
Parameter | Description | Possible Values | Default Value |
---|---|---|---|
firstFetchDelayMillis | Initial delay before fetching messages (in milliseconds). | Any positive integer | 0 |
fetchIntervalMillis | Interval between fetch operations (in milliseconds). | Any positive integer | 10000 |
subscriberProvisionName | Links to the subscriber provision in server.recon . |
String matching the provision name | N/A |
contentTypeOverride | Specifies the message content type. |
json , avro , protobuf , octet-stream , string (default) |
N/A |
contentEncodingOverride | Specifies the encoding type for binary content. |
base64 , none |
none |
protoFilePath | (For Protobuf only) The file path to the Protobuf schema definition. | Valid file path | N/A |
avroSchemaFilePath | (For Avro only) The file path to the Avro schema definition. | Valid file path | N/A |
Handling Different Content Types
You can configure different content types by setting the contentTypeOverride parameter:
- For
Avro
, make sure to provide the Avro schema reference using theavroSchemaFilePath
parameter in the configuration.- Example Configuration for
Avro
pubsubIngressConf: @pubsubIngressSettings { firstFetchDelayMillis: 5000, fetchIntervalMillis: 20000, subscriberProvisionName: "pubsub-subscriber-provision", contentTypeOverride: avro, avroSchemaFilePath: "path/to/avro/schema.avsc" }
- Example Configuration for
- For
Binary(octet-stream)
, you can specify an encoding type (e.g.,base64
) usingcontentEncodingOverride
parameter.- Example Configuration for
Binary(octet-stream)
pubsubIngressConf: @pubsubIngressSettings { firstFetchDelayMillis: 5000, fetchIntervalMillis: 20000, subscriberProvisionName: "pubsub-subscriber-provision", contentTypeOverride: octet-stream, contentEncodingOverride: base64 }
- Example Configuration for
- For Protobuf, use
protoFilePath
parameter to specify the schema definition:- Example Configuration for
Protobuf
pubsubIngressConf: @pubsubIngressSettings { firstFetchDelayMillis: 5000, fetchIntervalMillis: 20000, subscriberProvisionName: "pubsub-subscriber-provision", contentTypeOverride: protobuf, protoFilePath: "path/to/protobuf/schema.proto" }
- Example Configuration for
- For JSON content and string content, no additional parameters are required.
Extending Functionality
You can extend the PubSubIngestingPatch
to incorporate custom logic for message processing:
public class CustomIngestingAgent extends PubSubIngestingPatch {
@Override
protected void ingest(Value resultStructure) throws DeferrableException {
// Add custom processing logic here
}
}
Troubleshooting Tips
- Connection Errors: Ensure that your Google Cloud credentials are correctly configured and that your application has the necessary permissions.
-
Message Parsing Issues: Verify that your schemas (
Avro
orProtobuf
) match the message formats being published. -
Timeouts: Adjust
firstFetchDelayMillis
andfetchIntervalMillis
to optimal values based on your system’s performance. -
Invalid Configuration: Double-check your
server.recon
file for syntax errors or missing parameters.
Advanced Settings
-
Error Handling: The adapter includes built-in error handling for each content type, logging errors and preventing system crashes. Customize error handling by overriding relevant methods in your
CustomIngestingAgent
. - Dynamic Configuration: Supports dynamic adjustments of fetch intervals and processing tactics based on current system load or operational requirements.
Additional Resources
Manual Acknowledgements
By default, messages are acknowledged as soon as they are consumed, without waiting for the results of any processing by downstream Web Agents.
Future nstream-toolkit
releases will include additional features to enhance control over message acknowledgments. We welcome your feedback and suggestions to improve this functionality.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).