Dynamic Stream Operator Generation using OpenAI and SwimOS

Discover how combining the power of OpenAI’s ChatGPT with the robust real-time data processing capabilities of SwimOS can transform your data streams into actionable insights on-the-fly. Streaming systems handle continuous and potentially unbounded streams of data, processing each piece of data upon arrival rather than waiting for the next batch. Stream operators are designed to process data incrementally while taking into account the boundless nature of the stream. They perform various transformations, computations, or aggregations on this real-time data. In this article, we’ll demonstrate how to dynamically generate stream operators using ChatGPT and seamlessly inject them into your application at runtime.

Technical Overview

Our application consists of the following components:

You can find the source here, (src/snippet.py), in the corresponding repo:

Companion Video

Here is a video demo of what is showcased below.

Code Walkthrough

Dependencies

We start by importing the required libraries, including openai for ChatGPT and swimos for SwimOS. We also load environment variables from a .env file, which contains our OpenAI API key.

import json
import os
import re
import time

from dotenv import load_dotenv
from openai import OpenAI
from swimos import SwimClient

SwimOS and OpenAI Client Setup

We initialize the SwimOS client, specifying the host URI for either the live or simulated feed. We configure two stock data feeds: a 24/7 simulated feed and a live feed, commented out, available during market hours.

# Load environment variables from .env file
load_dotenv()

# host_uri = "wss://stocks-live.nstream-demo.io"    # live feed during market hours
host_uri = "wss://stocks-simulated.nstream-demo.io" # simulated feed 24/7
current_exchange_rate = 1.2
current_alert_threshold = 50.0
synced = False

llm_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
swim_client = SwimClient(debug=True)
swim_client.start()

We create a downlink using the SwimOS Python client, providing a callback function to receive updates. We synchronize on the initialization of the Swim client before returning. We also set up a callback function to know when we are ready to receive updates.

def wait_did_sync():
    global synced
    synced = True

def setup_value_downlink(node_uri: str, callback=None):
    global swim_client
    global synced
    value_downlink = swim_client.downlink_value()
    value_downlink.set_host_uri(host_uri)
    value_downlink.set_node_uri(node_uri)
    value_downlink.set_lane_uri("status")
    if callback is not None:
        value_downlink.did_set(callback)
    value_downlink.did_sync(wait_did_sync)
    value_downlink.open()
    while not synced:
        time.sleep(1)
    return value_downlink

ChatGPT Code Generation

We define a function to generate code using ChatGPT. We utilize a retry loop to handle requests to ChatGPT, as the responses can be unpredictable. We make a call to the chat.completions.create endpoint and ensure we extract only the JSON data using a regular expression.

def generate_llm_code(
    prompt: str,
    expect_json: bool = False,
    max_retries: int = 3, 
    retry_delay: int = 1):

    retries = 0
    while retries < max_retries:
        try:
            response = llm_client.chat.completions.create(
                messages=[
                    {
                        "role": "user",
                        "content": prompt,
                    }
                ],
                model="gpt-4",
                max_tokens=1000
            )
            response_content = response.choices[0].message.content.strip()

            if expect_json:
                # Use regex to extract JSON object with non-greedy match
                json_match = re.search(
                    r'\{(?:[^{}]|\{(?:[^{}]|\{[^{}]*\})*\})*\}',
                    response_content, re.DOTALL)
                if json_match:
                    json_str = json_match.group(0)
                    try:
                        result = json.loads(json_str)
                        return result['result'] if 'result' in result else result
                    except (json.JSONDecodeError, KeyError) as e:
                        raise ValueError("Failed to decode JSON from LLM response") from e
                else:
                    raise ValueError("No valid JSON found in LLM response")
            else:
                return response_content

        except Exception as e:
            retries += 1
            print(f"Error: {e}, retrying... ({retries}/{max_retries})")
            time.sleep(retry_delay)

    raise ValueError("Max retries exceeded, failed to get valid response from LLM")

Dynamic Function Generation

We ask ChatGPT to dynamically generate Python code for a stream operator. This approach provides flexibility, allowing us to handle various data processing needs dynamically without redeploying the application. Specifically, we request a function to maintain a simple moving average with a window size of 5.

def accumulate_generate(symbol: str, streaming_operator: str, parameters: dict):
    """Generate a function to accumulate stock prices (min/max/avg) using LLM"""
    global acc
    acc = {}

    prompt = f"""
    Return a JSON result, and only a JSON result. The JSON must have a single
    top-level key: `result`. In this `result` key, store a string that contains
    a python function with the following signature
    `def func(acc: dict, new_value: float, params: dict):`
    and the implementation must be as follows: calculate the {streaming_operator}
    on `new_value` given accumulator state of `acc` that your function has
    defined in order to continue applying the {streaming_operator} as each new
    value arrives. Your function must return a tuple consisting of `acc` followed
    by the result of its calculation. The parameters for this operation are: {parameters}.
    """
    function_code_str = generate_llm_code(prompt, expect_json=True)

    # Generate and save a dynamic function
    local_vars = {}
    exec(function_code_str, {}, local_vars)
    func_name = function_code_str.split('(')[0].split()[1]
    func = local_vars[func_name]

    def accumulate_generate_callback(new_value: dict, _old_value: dict):
        global acc
        print(f"accumulate_generate_callback received for {symbol}: {new_value}.\n")
        acc, summary = func(acc, new_value['price'], parameters)
        print(f"{symbol} -- summary: {summary}; acc: {acc}")

    node_uri = f"/stock/{symbol}"
    print('Streaming data, press Ctrl+C to stop')
    value_downlink = setup_value_downlink(node_uri, accumulate_generate_callback)
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        value_downlink.close()
        print('Streaming stopped')

if __name__ == "__main__":
    result = accumulate_generate("AAAA", "simple moving average", {"window_size": 5})
    print(result)

Conclusion

In this article, we demonstrated how to integrate OpenAI’s ChatGPT and SwimOS for dynamic stream operator generation. By leveraging ChatGPT’s code generation capabilities and SwimOS’s real-time streaming data processing, we can create efficient and scalable data processing pipelines for ad-hoc use cases. As mentioned, you can find the code here, in src/snippet.py. You will also find a console program, src/main.py that incorporates additional stream operators and shows how you can enable natural language query for ad-hoc stream processing.

Now it’s your turn! Try implementing these patterns in your own projects and see the benefits of dynamic, real-time data processing firsthand.

To learn more about SwimOS, please visit https://www.swimos.org/