Examples¶
Hello World¶
A minimal server with two unary methods — greet and add. Supports both stdio and HTTP transports.
// © Copyright 2025-2026, Query.Farm LLC - https://query.farm
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"fmt"
"net"
"net/http"
"os"
"github.com/Query-farm/vgi-rpc/vgirpc"
)
type GreetParams struct {
Name string `vgirpc:"name"`
}
type AddParams struct {
A float64 `vgirpc:"a"`
B float64 `vgirpc:"b"`
}
func main() {
server := vgirpc.NewServer()
vgirpc.Unary(server, "greet", func(_ context.Context, ctx *vgirpc.CallContext, p GreetParams) (string, error) {
return "Hello, " + p.Name + "!", nil
})
vgirpc.Unary(server, "add", func(_ context.Context, ctx *vgirpc.CallContext, p AddParams) (float64, error) {
return p.A + p.B, nil
})
if len(os.Args) > 1 && os.Args[1] == "--http" {
httpServer := vgirpc.NewHttpServer(server)
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
fmt.Fprintf(os.Stderr, "failed to listen: %v\n", err)
os.Exit(1)
}
port := listener.Addr().(*net.TCPAddr).Port
fmt.Printf("PORT:%d\n", port)
os.Stdout.Sync()
if err := http.Serve(listener, httpServer); err != nil {
fmt.Fprintf(os.Stderr, "http serve error: %v\n", err)
os.Exit(1)
}
} else {
server.RunStdio()
}
}
Run it:
# Build and run with stdio transport
go build -o hello-world ./examples/hello_world/
./hello-world
# Or with HTTP transport
./hello-world --http
Math Service¶
A more complete example with all four method types: unary (add, multiply), producer (countdown), and exchange (running_sum).
// © Copyright 2025-2026, Query.Farm LLC - https://query.farm
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"fmt"
"net"
"net/http"
"os"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/Query-farm/vgi-rpc/vgirpc"
)
func init() {
// Register state types for gob serialization (needed for HTTP transport).
vgirpc.RegisterStateType(&CountdownState{})
vgirpc.RegisterStateType(&RunningSumState{})
}
// --- Parameter structs ---
type AddParams struct {
A float64 `vgirpc:"a"`
B float64 `vgirpc:"b"`
}
type MultiplyParams struct {
A float64 `vgirpc:"a"`
B float64 `vgirpc:"b"`
}
type CountdownParams struct {
Start int64 `vgirpc:"start"`
}
type RunningSumParams struct {
Initial float64 `vgirpc:"initial,default=0"`
}
// --- Schemas ---
var CountdownOutputSchema = arrow.NewSchema([]arrow.Field{
{Name: "value", Type: arrow.PrimitiveTypes.Int64},
}, nil)
var RunningSumInputSchema = arrow.NewSchema([]arrow.Field{
{Name: "value", Type: arrow.PrimitiveTypes.Float64},
}, nil)
var RunningSumOutputSchema = arrow.NewSchema([]arrow.Field{
{Name: "sum", Type: arrow.PrimitiveTypes.Float64},
}, nil)
// --- Producer state: countdown ---
type CountdownState struct {
Current int64
}
func (s *CountdownState) Produce(_ context.Context, out *vgirpc.OutputCollector, _ *vgirpc.CallContext) error {
if s.Current < 0 {
return out.Finish()
}
mem := memory.NewGoAllocator()
b := array.NewInt64Builder(mem)
defer b.Release()
b.Append(s.Current)
arr := b.NewArray()
defer arr.Release()
if err := out.EmitArrays([]arrow.Array{arr}, 1); err != nil {
return err
}
s.Current--
return nil
}
// --- Exchange state: running_sum ---
type RunningSumState struct {
Sum float64
}
func (s *RunningSumState) Exchange(_ context.Context, input arrow.RecordBatch, out *vgirpc.OutputCollector, _ *vgirpc.CallContext) error {
valueCol := input.Column(0).(*array.Float64)
for i := int64(0); i < input.NumRows(); i++ {
s.Sum += valueCol.Value(int(i))
}
mem := memory.NewGoAllocator()
b := array.NewFloat64Builder(mem)
defer b.Release()
b.Append(s.Sum)
arr := b.NewArray()
defer arr.Release()
return out.EmitArrays([]arrow.Array{arr}, 1)
}
// --- Main ---
func main() {
server := vgirpc.NewServer()
// Unary: add two numbers
vgirpc.Unary(server, "add", func(_ context.Context, _ *vgirpc.CallContext, p AddParams) (float64, error) {
return p.A + p.B, nil
})
// Unary: multiply two numbers
vgirpc.Unary(server, "multiply", func(_ context.Context, _ *vgirpc.CallContext, p MultiplyParams) (float64, error) {
return p.A * p.B, nil
})
// Producer: countdown from start to 0
vgirpc.Producer(server, "countdown", CountdownOutputSchema,
func(_ context.Context, _ *vgirpc.CallContext, p CountdownParams) (*vgirpc.StreamResult, error) {
return &vgirpc.StreamResult{
OutputSchema: CountdownOutputSchema,
State: &CountdownState{Current: p.Start},
}, nil
})
// Exchange: running sum of input values
vgirpc.Exchange(server, "running_sum", RunningSumOutputSchema, RunningSumInputSchema,
func(_ context.Context, _ *vgirpc.CallContext, p RunningSumParams) (*vgirpc.StreamResult, error) {
return &vgirpc.StreamResult{
OutputSchema: RunningSumOutputSchema,
State: &RunningSumState{Sum: p.Initial},
}, nil
})
if len(os.Args) > 1 && os.Args[1] == "--http" {
httpServer := vgirpc.NewHttpServer(server)
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
fmt.Fprintf(os.Stderr, "failed to listen: %v\n", err)
os.Exit(1)
}
port := listener.Addr().(*net.TCPAddr).Port
fmt.Printf("PORT:%d\n", port)
os.Stdout.Sync()
if err := http.Serve(listener, httpServer); err != nil {
fmt.Fprintf(os.Stderr, "http serve error: %v\n", err)
os.Exit(1)
}
} else {
server.RunStdio()
}
}
Python Client¶
The math service includes a Python client that demonstrates calling all method types across both transports:
"""Demo client for the math_service Go server.
Demonstrates all four RPC method types (unary, producer, exchange) across
both subprocess (stdio) and HTTP transports.
Requires: pip install "vgi-rpc[http]"
Run with:
python examples/math_service/client.py
"""
from __future__ import annotations
import subprocess
import time
from pathlib import Path
from typing import Protocol
from vgi_rpc.http import http_connect
from vgi_rpc.rpc import AnnotatedBatch, RpcConnection, Stream, StreamState, SubprocessTransport
BINARY = "./math-service"
# ---------------------------------------------------------------------------
# Protocol definition — matches the Go server's registered methods
# ---------------------------------------------------------------------------
class MathService(Protocol):
def add(self, a: float, b: float) -> float: ...
def multiply(self, a: float, b: float) -> float: ...
def countdown(self, start: int) -> Stream[StreamState]: ...
def running_sum(self, initial: float = 0.0) -> Stream[StreamState]: ...
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def demo_all_methods(proxy: MathService, transport_name: str) -> None:
"""Exercise every method on the given proxy."""
print(f"\n{'=' * 50}")
print(f" {transport_name} transport")
print(f"{'=' * 50}")
# --- Unary: add ---
result = proxy.add(a=3.0, b=4.0)
print(f"\nadd(3, 4) = {result}")
assert result == 7.0
# --- Unary: multiply ---
result = proxy.multiply(a=6.0, b=7.0)
print(f"multiply(6, 7) = {result}")
assert result == 42.0
# --- Producer: countdown ---
print("\ncountdown(start=5):")
batches = list(proxy.countdown(start=5))
for ab in batches:
val = ab.batch.column("value")[0].as_py()
print(f" value={val}")
assert len(batches) == 6 # 5, 4, 3, 2, 1, 0
assert batches[0].batch.column("value")[0].as_py() == 5
assert batches[-1].batch.column("value")[0].as_py() == 0
# --- Exchange: running_sum ---
print("\nrunning_sum(initial=0):")
with proxy.running_sum(initial=0.0) as session:
for values in [[1.0, 2.0], [3.0], [4.0, 5.0]]:
inp = AnnotatedBatch.from_pydict({"value": values})
out = session.exchange(inp)
running = out.batch.column("sum")[0].as_py()
print(f" sent {values} -> sum={running}")
print("\nAll assertions passed!")
# ---------------------------------------------------------------------------
# Subprocess transport demo
# ---------------------------------------------------------------------------
def demo_subprocess() -> None:
transport = SubprocessTransport([BINARY])
try:
with RpcConnection(MathService, transport) as proxy: # type: ignore[type-abstract]
demo_all_methods(proxy, "subprocess (stdio)")
finally:
transport.close()
# ---------------------------------------------------------------------------
# HTTP transport demo
# ---------------------------------------------------------------------------
def demo_http() -> None:
proc = subprocess.Popen(
[BINARY, "--http"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
assert proc.stdout is not None
line = proc.stdout.readline().decode().strip()
assert line.startswith("PORT:"), f"Expected PORT:<n>, got: {line!r}"
port = int(line.split(":", 1)[1])
# Wait for server readiness
import httpx
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline:
try:
httpx.get(f"http://127.0.0.1:{port}/", timeout=1.0)
break
except (httpx.ConnectError, httpx.ConnectTimeout):
time.sleep(0.1)
except httpx.HTTPStatusError:
break
with http_connect(MathService, f"http://127.0.0.1:{port}") as proxy: # type: ignore[type-abstract]
demo_all_methods(proxy, "HTTP")
finally:
proc.terminate()
proc.wait(timeout=5)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# Build the server binary
repo_root = Path(__file__).resolve().parent.parent.parent
print("Building math-service binary...")
subprocess.run(
["go", "build", "-o", str(repo_root / BINARY), "./examples/math_service/"],
check=True,
cwd=str(repo_root),
)
print("Build complete.")
demo_subprocess()
demo_http()
print("\nAll demos completed successfully.")
Run the demo: