Google Pub/Sub Ingress

Nstream provides a robust Google Pub/Sub Adapter library designed to facilitate the ingestion of data from Google Pub/Sub topics directly into Web Agents with minimal configuration required. This adapter simplifies integration, allowing developers to focus on data processing rather than complex ingestion mechanisms.

Prerequisites

Dependencies

To use the Google Pub/Sub Adapter, include the following dependency in your project:

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-pubsub</artifactId>
  <version>4.13.21</version>
</dependency>

Alternatively, you can manage dependencies using your preferred build tool, such as Maven or Gradle.

Glossary

Supported Content Types

The adapter supports ingestion of messages in various formats, such as:

Configuration Overview

Here’s how to configure an agent to receive and process messages from Google Pub/Sub, as depicted in the server.recon setup:

Note:

# server.recon
"provisions": {
  @provision("pubsub-subscriber-provision") {
    class: "nstream.adapter.common.provision.PropertiesProvision",
    def: {
      "projectId": "your-google-cloud-project-id",
      "topicName": "your-pubsub-topic-name",
      "subscriptionId": "your-pubsub-subscription-id",
      "subscriberProvisionName": "pubsub-subscriber-provision"
    }
  }
}

"pubsub-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/ingesting"
    @agent(class: "nstream.adapter.pubsub.PubSubIngestingPatch") {
      pubsubIngressConf: @pubsubIngressSettings {
        firstFetchDelayMillis: 5000,
        fetchIntervalMillis: 20000,
        subscriberProvisionName: "pubsub-subscriber-provision",
        contentTypeOverride: json
        relaySchema: @command {
          nodeUri: "/dynamic/$val+1"
          laneUri: "unused"
          value:
        }
      }
    }
  }
}

# Configure desired web settings (e.g. port)
# ...

PubSubIngressSettings

Note: Parameters projectId, topicName, and subscriptionId are defined within the provision configuration.

These settings dictate how messages are fetched and processed:

Parameter Description Possible Values Default Value
firstFetchDelayMillis Initial delay before fetching messages (in milliseconds). Any positive integer 0
fetchIntervalMillis Interval between fetch operations (in milliseconds). Any positive integer 10000
subscriberProvisionName Links to the subscriber provision in server.recon. String matching the provision name N/A
contentTypeOverride Specifies the message content type. json, avro, protobuf, octet-stream, string (default) N/A
contentEncodingOverride Specifies the encoding type for binary content. base64, none none
protoFilePath (For Protobuf only) The file path to the Protobuf schema definition. Valid file path N/A
avroSchemaFilePath (For Avro only) The file path to the Avro schema definition. Valid file path N/A

Handling Different Content Types

You can configure different content types by setting the contentTypeOverride parameter:

Extending Functionality

You can extend the PubSubIngestingPatch to incorporate custom logic for message processing:

public class CustomIngestingAgent extends PubSubIngestingPatch {

  @Override
  protected void ingest(Value resultStructure) throws DeferrableException {
    // Add custom processing logic here
  }
}

Troubleshooting Tips

Advanced Settings

Additional Resources

Manual Acknowledgements

By default, messages are acknowledged as soon as they are consumed, without waiting for the results of any processing by downstream Web Agents. Future nstream-toolkit releases will include additional features to enhance control over message acknowledgments. We welcome your feedback and suggestions to improve this functionality.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).