Plumber
Plumber is a single-binary, CLI devtool for inspecting, piping, massaging and redirecting data in various messaging and database systems.
plumber
enables you to:
- Safely view the contents of your data streams
- Write plain or encoded data to any system
- Route data from one place to another
- Decode protobuf/avro/thrift/JSON data in real-time
- Relay data to the Streamdal platform
- Ship change data capture events to Streamdal platform
- Replay events into a message system on your local network
- And many other features (for a full list:
plumber -h
) - Programmatically manage relays and tunnels for the Streamdal platform
The plumber
tool allows you to read, write, relay and tunnel data right from your CLI. While this is OK for quick one-offs, it has some drawbacks - if you are planning to make use of relay or tunnel functionality in production or in a more serious capacity:
- You will need to launch a new
plumber
instance for every relay and tunnel - Difficult to pull off high-availability
- Potential for missed data while
plumber
is not running (such as a redeploy)
This might be acceptable depending on your integration needs. If plumber
exists in dev to facilitate testing or enable developers to tunnel data into their local workstations - itβs probably fine. But if you are planning on having many relays and/or many tunnels, it wouldnβt be well-suited.
To address this, we have built plumber server
mode.
Plumberβs server
mode enables you to run any number of plumber
instances as a distributed cluster. Server mode exposes a gRPC API which can be used to run and manage multiple relays and tunnels in parallel.
Plumber server
can run in either of two operational modes - **standalone **or cluster.
Operational Modes
Standalone
In standalone mode, plumber
will write its config to a local JSON
file (located at ~/.batchsh/config.json
).
Suitable for:
- Local and dev environments
- Low throughput
<100 events per sec
- High-reliability is not required
Cluster
In cluster mode, plumber
uses a message bus (NATS) to facilitate communication between the plumber instances and for storing its configs. You can run any number of plumber
instances - we recommend to start with 3.
Suitable for:
- Production environments
- High throughput
1,000+ events per sec
- High-reliability is required
How to Run
Standalone
When running in Docker, you will want to mount this file as a writable volume to the container.
# Install plumber
$ brew tap streamdal/public
...
$ brew install plumber
...
# Launch plumber in standalone mode
$ plumber server
INFO[0000]
βββββ ββ βββββββββββββββββββ
βββββββββββββ β βββββββββββββ
INFO[0000] starting plumber server in 'standalone' mode... pkg=plumber
INFO[0005] plumber server started pkg=plumber
$ docker run --name plumber-server -p 8080:8080 \
-v plumber-config:/Users/username/.batchsh:rw \
streamdal/plumber:latest
---
apiVersion: v1
kind: Service
metadata:
name: plumber-standalone
labels:
app.kubernetes.io/name: plumber-standalone
spec:
type: ClusterIP
ports:
- port: 9090
targetPort: 9090
protocol: TCP
name: grpc-api
selector:
app.kubernetes.io/name: plumber-standalone
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: plumber-standalone
labels:
app.kubernetes.io/name: plumber-standalone
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: plumber-standalone
template:
metadata:
labels:
app.kubernetes.io/name: plumber-standalone
spec:
containers:
- name: plumber-standalone
image: "streamdal/plumber:v1.4.0"
imagePullPolicy: IfNotPresent
command: ["/plumber-linux", "server"]
ports:
- containerPort: 9090
env:
- name: PLUMBER_SERVER_ENABLE_CLUSTER
value: "false"
- name: PLUMBER_SERVER_NODE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
Follow the README.md
in: https://github.com/streamdal/plumber-helm#standalone
Cluster
The NATS container SHOULD have a persistent storage volume.
# Install plumber
$ brew tap streamdal/public
...
$ brew install plumber
...
# Git clone plumber repo (to get access to its docker-compose + assets)
$ git clone [email protected]:streamdal/plumber.git
$ cd plumber
# Launch a NATS container
$ docker-compose up -d natsjs
# Launch plumber in cluster mode
$ plumber server --enable-cluster
INFO[0000]
βββββ ββ βββββββββββββββββββ
βββββββββββββ β βββββββββββββ
INFO[0000] starting plumber server in 'cluster' mode... pkg=plumber
INFO[0015] plumber server started pkg=plumber
# Git clone plumber repo (to get access to its docker-compose + assets)
$ git clone [email protected]:streamdal/plumber.git
$ cd plumber
# Launch a NATS container
$ docker-compose up -d natsjs
# Launch a plumber container that points to your NATS instance
$ docker run --name plumber-server -p 9090:9090 \
--network plumber_default \
-e PLUMBER_SERVER_NATS_URL=nats://natsjs \
-e PLUMBER_SERVER_ENABLE_CLUSTER=true \
streamdal/plumber:latest
{"level":"info","msg":"starting plumber server in 'cluster' mode...","pkg":"plumber","time":"2022-02-15T20:21:47Z"}
{"level":"info","msg":"plumber server started","pkg":"plumber","time":"2022-02-15T20:22:02Z"}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: nats-config
namespace: default
labels:
app.kubernetes.io/name: nats
data:
nats.conf: |
# NATS Clients Port
port: 4222
# PID file shared with configuration reloader.
pid_file: "/var/run/nats/nats.pid"
http: 8222
server_name:$POD_NAME
jetstream {
max_mem: 1Gi
store_dir: /data
max_file:5Gi
}
lame_duck_duration: 120s
---
apiVersion: v1
kind: Service
metadata:
name: nats
namespace: default
labels:
app.kubernetes.io/name: nats
spec:
selector:
app.kubernetes.io/name: nats
clusterIP: None
ports:
- name: client
port: 4222
- name: cluster
port: 6222
- name: monitor
port: 8222
- name: metrics
port: 7777
- name: leafnodes
port: 7422
- name: gateways
port: 7522
---
apiVersion: v1
kind: Service
metadata:
name: plumber-cluster
labels:
app.kubernetes.io/name: plumber-cluster
spec:
type: ClusterIP
ports:
- port: 9090
targetPort: 9090
protocol: TCP
name: grpc-api
selector:
app.kubernetes.io/name: plumber-cluster
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: nats-box
namespace: default
labels:
app: nats-box
chart: nats-0.13.0
spec:
replicas: 1
selector:
matchLabels:
app: nats-box
template:
metadata:
labels:
app: nats-box
spec:
volumes:
containers:
- name: nats-box
image: natsio/nats-box:0.8.1
imagePullPolicy: IfNotPresent
resources: null
env:
- name: NATS_URL
value: nats
command:
- "tail"
- "-f"
- "/dev/null"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: plumber-cluster
labels:
app.kubernetes.io/name: plumber-cluster
spec:
replicas: 3
selector:
matchLabels:
app.kubernetes.io/name: plumber-cluster
template:
metadata:
labels:
app.kubernetes.io/name: plumber-cluster
spec:
containers:
- name: plumber-cluster
image: "streamdal/plumber:v1.4.0"
imagePullPolicy: IfNotPresent
command: ["/plumber-linux", "server"]
ports:
- containerPort: 9090
env:
- name: PLUMBER_SERVER_CLUSTER_ID
value: "7EB6C7FB-9053-41B4-B456-78E64CF9D393"
- name: PLUMBER_SERVER_ENABLE_CLUSTER
value: "true"
- name: PLUMBER_SERVER_NATS_URL
value: "nats://nats.default.svc.cluster.local:4222"
- name: PLUMBER_SERVER_USE_TLS
value: "false"
- name: PLUMBER_SERVER_NODE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: nats
namespace: default
labels:
app.kubernetes.io/name: nats
spec:
selector:
matchLabels:
app.kubernetes.io/name: nats
replicas: 1
serviceName: nats
podManagementPolicy: Parallel
template:
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "7777"
prometheus.io/scrape: "true"
labels:
app.kubernetes.io/name: nats
spec:
# Common volumes for the containers.
volumes:
- name: config-volume
configMap:
name: nats-config
# Local volume shared with the reloader.
- name: pid
emptyDir: {}
# Required to be able to HUP signal and apply config
# reload to the server without restarting the pod.
shareProcessNamespace: true
terminationGracePeriodSeconds: 120
containers:
- name: nats
image: nats:2.7.2-alpine
imagePullPolicy: IfNotPresent
ports:
- containerPort: 4222
name: client
- containerPort: 7422
name: leafnodes
- containerPort: 7522
name: gateways
- containerPort: 6222
name: cluster
- containerPort: 8222
name: monitor
- containerPort: 7777
name: metrics
command:
- "nats-server"
- "--config"
- "/etc/nats-config/nats.conf"
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: SERVER_NAME
value: $(POD_NAME)
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: CLUSTER_ADVERTISE
value: $(POD_NAME).nats.$(POD_NAMESPACE).svc.cluster.local
volumeMounts:
- name: config-volume
mountPath: /etc/nats-config
- name: pid
mountPath: /var/run/nats
- name: nats-js-pvc
mountPath: /data
livenessProbe:
httpGet:
path: /
port: 8222
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 60
successThreshold: 1
failureThreshold: 3
startupProbe:
httpGet:
path: /
port: 8222
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
successThreshold: 1
failureThreshold: 30
# Gracefully stop NATS Server on pod deletion or image upgrade.
#
lifecycle:
preStop:
exec:
# Using the alpine based NATS image, we add an extra sleep that is
# the same amount as the terminationGracePeriodSeconds to allow
# the NATS Server to gracefully terminate the client connections.
#
command:
- "/bin/sh"
- "-c"
- "nats-server -sl=ldm=/var/run/nats/nats.pid"
- name: reloader
image: natsio/nats-server-config-reloader:0.6.2
imagePullPolicy: IfNotPresent
resources: null
command:
- "nats-server-config-reloader"
- "-pid"
- "/var/run/nats/nats.pid"
- "-config"
- "/etc/nats-config/nats.conf"
volumeMounts:
- name: config-volume
mountPath: /etc/nats-config
- name: pid
mountPath: /var/run/nats
- name: metrics
image: natsio/prometheus-nats-exporter:0.9.1
imagePullPolicy: IfNotPresent
args:
- -connz
- -routez
- -subz
- -varz
- -prefix=nats
- -use_internal_server_id
- -jsz=all
- http://localhost:8222/
ports:
- containerPort: 7777
name: metrics
volumeClaimTemplates:
- metadata:
name: nats-js-pvc
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: 5Gi
---
apiVersion: v1
kind: Pod
metadata:
name: "nats-test-request-reply"
labels:
app.kubernetes.io/name: nats
annotations:
"hook": test
spec:
containers:
- name: nats-box
image: synadia/nats-box
env:
- name: NATS_HOST
value: nats
command:
- /bin/sh
- -ec
- |
nats reply -s nats://$NATS_HOST:4222 'name.>' --command "echo 1" &
- |
"&&"
- |
name=$(nats request -s nats://$NATS_HOST:4222 name.test '' 2>/dev/null)
- |
"&&"
- |
[ $name = test ]
restartPolicy: Never
Follow the README.md
in: https://github.com/streamdal/plumber-helm#cluster-mode
Environment Variables
Cluster
All of the following environment variables are only used if plumber is launched in cluster mode.
PLUMBER_SERVER_NODE_ID
PLUMBER_SERVER_NATS_URL
PLUMBER_SERVER_AUTH_TOKEN
PLUMBER_SERVER_CLUSTER_ID
If you run plumber
as a stateful set, you can assign each node its own $node_id
which will simplify debug, parsing logs, etc.
If unset, it will be automatically generated at launch.
Must be set to your local NATS serverβs address.
Used for authenticating with the plumber
instance. Set to streamdal
by default.
All plumber
instances part of the same cluster must share the same id.
Set to βtrueβ to enable cluster mode. If not set, plumber
will run in standalone mode.
PLUMBER_SERVER_ENABLE_CLUSTER
Metrics
Plumber exposes various prometheus metrics that are exposed via its internal HTTP server: