INDEX
########################################################### 2024-01-15 11:08 ########################################################### That Devops Guy... Message Queues for Beginners https://www.youtube.com/watch?v=hfUIWe1tK8E A message queue is an async server-server messaging system (no waiting - decouples services) Delivery acknowledgement means messages pair with ACK - to say if they are being actioned docker network create rabbits docker run --rm --net rabbits docker run -d --rm --net rabbits --hostname rabbit-1 --name rabbit-1 rabbitmq:3.8 # rabbit uses ID@HOSTNAME - so need to run this in a stateful set for consistent hostnames docker logs rabbit-1 docker exec -it rabbit-1 bash rabbitmqctl # CLI tool for managing rabbitmq rabbitmq-plugins # Plugin manager - e.g. management interface and prometheus monitoring rabbitmq-plugins list # List all plugins docker run -p 8080:15672 ... rabbitmq:3.8 # Run rabbitmq but expose management port 15672 rabbitmq-plugins enable rabbitmq_management # Enable management website localhost:8080 Log in with guest:guest and can see all connections, channels and queues A channel is a virtual connection to a queue - used by programs to connect to queues # publisher/Dockerfile FROM goland:1.14-alpine as build RUN apk add --no-cache git WORKDIR /src RUN go get github.com/julienschmidt/httprouter # Website (API) RUN go get github.com/sirupsen/logrus # Logging system RUN go get github.com/sirupsen/amqp # MQ SDK COPY publisher.go /src RUN go build publisher.go ###### FROM alpine as runtime COPY --from=build /src/publisher /app/publisher CMD ["/app/publisher"] # publisher/publisher.go package main import ( ... ) var rabbit_host = o.Getenv("RABBIT_HOST") var rabbit_port = o.Getenv("RABBIT_PORT") var rabbit_user = o.Getenv("RABBIT_USER") var rabbit_password = o.Getenv("RABBIT_PASSWORD") func main() { router := httprouter.New() router.POST( "/publish/:message", func(w http.ResponseWriter, r *http.Request, p httprouter.Params){ submit(w,r,p) } ) fmt.Println("Running...") log.Fatal(http.ListenAndServe(":80", router)) } func submit(writer http.ResponseWriter, request *http.Request, p httprouter.Params) { message := p.ByName("message") fmt.Println("Received message: " + message) conn, err := amqp.Dial( "amqp://" + rabbit_user + ":" + rabbit_password + "@" + rabbit_host + ":" + rabbit_port + "/" ) if err != nil { log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("%s: %s", "Failed to open a channel", err) } defer ch.Close() q, err := ch.QueueDeclare("publisher", false, false, false, false, nil) if err != nil { log.Fatalf("%s: %s", "Failed to declare a queue", err) } err = ch.Publish("", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) if err != nil { log.Fatalf("%s: %s", "Failed to publish a message", err) } fmt.Println("Publish Success") } docker build . -t aimvector/rabbitmq/rabbitmq-publisher:v1.0.0 docker run -it --rm --net rabbits -e RABBIT_HOST=rabbit-1 -e RABBIT_PORT=5672 \ -e RABBIT_USERNAME=guest -e RABBIT_PASSWORD=guest \ -p 80:80 aimvector/rabbitmq-publisher:v1.0.0 # Now application is running, can use something like postman to send data # Sending "localhost:80/publish/Hello" adds "hello" to the "publisher" queue Now similarly make a consumer - basically the same but called "consumer" # consumer.go package main import (...) ... # get environment variables func main() { consume() } func consume() { conn, err := amqp.Dial( "amqp://" + rabbit_user + ":" + rabbit_password + "@" + rabbit_host + ":" + rabbit_port + "/" ) if err != nil { log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err) } ch, err := conn.Channel() if err != nil { log.Fatalf("%s: %s", "Failed to open a channel", err) } q, err := ch.QueueDeclare("publisher", false, false, false, false, nil) if err != nil { log.Fatalf("%s: %s", "Failed to declare a queue", err) } fmt.Println("Channel and Queue established") defer conn.Close() defer ch.Close() # Here you can enable things like "delivery acknowledgement" msgs, err = ch.Consume(q.Name, "", false, false, false, false, nil if err != nil { log.Fatalf("%s: %s", "Failed to declare a queue", err) } forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s, d.Body) d.Ack(false) } }() fmt.Println("Running") <-forever } docker build . -t aimvector/rabbitmq-consumer:v1.0.0 docker run -it --rm --net rabbits -e RABBIT_HOST=rabbit-1 -e RABBIT_PORT=5672 \ -e RABBIT_USERNAME=guest -e RABBIT_PASSWORD=guest aimvector/rabbitmq-publisher:v1.0.0 # Automatically this prints out the "Hello" message and removes it from the queue
That Devops Guy... How to setup a RabbitMQ cluster https://www.youtube.com/watch?v=FzqjtU2x6YA RabbitMQ uses node identifiers (PREFIX@DOMAIN e.g. rabbit@rabbit-1) Need stateful sets in k8 as deployments have random names - in cloud, EC2 hostnames are key docker network create rabbits # All critical data in rabbitmq is replicated across nodes by default in a cluster # Cluster uses an "erlang cookie" for each instance to talk to each other docker run -d --rm --net rabbits --hostname rabbit-1 --name rabbit-1 rabbitmq:3.8 docker exec -it rabbit-1 cat /var/lib/rabbitmq/.erlang.cookie # Copy this value Form a cluster manually wiht rabbitmqctl then use config file to build cluster docker run -d --rm --net rabbits --rabbit-N --name rabbit-N \ -p 808N:15672 rabbitmq:3.8-management # Run this with N=1,2,3 to run 3 separate rabbitmq nodes with management dash enabled # Login with guest:guest docker exec -it rabbut-1 rabbitmqctl cluster_status # Check if inside a cluster # Join a node to node-1 to form a cluster - removes all data and joins docker exec -it rabbit-2 rabbitmqctl stop_app ... reset; ... join_cluster rabbit@rabbit-1; ... start_app; ... cluster_status # This fails as need to have that cookie value # All containers need to be run with same erlang cookie to work - restart containers and run docker run -d --rm --net rabbits --rabbit-N --name rabbit-N \ -e RABBITMQ_ERLANG_COOKIE=... -p 808N:15672 rabbitmq:3.8-management # Now run the join commands and cluster is joined together Container state is lost when it is deleted - so want to use automation to keep state # rabbit-1/rabbitmq.conf loopback_users.guest = false listeners.tcp.default = 5672 cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config cluster_formation.classic_config.nodes.1 = rabbit@rabbit-1 cluster_formation.classic_config.nodes.2 = rabbit@rabbit-2 cluster_formation.classic_config.nodes.3 = rabbit@rabbit-3 # Repeat the same for rabbit-2 and rabbit-3 - knows where peers are # To run container using this config - again repeat with N=1,2,3 docker run -d --rm --net rabbits -v ${PWD}/config/rabbit-N/:/config/ \ -e RABBITMQ_CONFIG_FILE=/config/rabbitmq - RABBITMQ_ERLANG_COOKIE=... \ --hostname rabbit-N --name rabbit-N -p 808N:15672 rabbitmq:3.8-management # Uses different config for each rabbitmq as mounting it may cause weird overwriting # Now cluster will automatically form Rabbitmq only copies operational data when necessary - by default data is not mirrored Classic mirror queues - can select number of mirrors to have or all or specific nodes docker exec -it rabbit-1 bash # Set rules based on regex matching - ".*" to apply to all rabbitmqctl set-policy ha-fed \ ".*" '{"federation-upstream-set":"all", "ha-mode":"nodes", \ "ha-params":["rabbit@rabbit-1","rabbit@rabbit-2","rabbit@rabbit-3"]}' \ --priority 1 --apply-to queues rabbitmq-plugins enable rabbitmq_federation # Run this on all nodes for this to work # Now any queue created on node1 is mirrored to nodes 2 and 3 - node1 is now master So rabbit1 is the master - if that dies then another node (say node2) becomes master If rabbit1 comes back up, it comes back as a mirror of node2 but out of sync with cluster Need to enable sync in the policy command rabbitmqctl set-policy ha-fed ... "ha-sync-mode":"automatic" ...
That Devops Guy... RabbitMQ on Kubernetes https://www.youtube.com/watch?v=_lpDfMkxccc Make a basic cluster and launch rabbitmq kind create cluster --name rabbit --image kindest/node:v1.18.4 kubectl create ns rabbits kubectl get storageclass # Check what storage types can be used - use local default Need to create a service account for rabbitmq to use # rabbit-rbac.yaml apiversion: v1 kind: ServiceAccount metadata: name: rabbitmq --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: rabbitmq rules: - apiGroups: - "" resources: - endpoints verbs: - get - list - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: rabbitmq namespace: rabbits subjects: - kind: ServiceAccount name: rabbitmq namespace: rabbits roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: rabbitmq Now for automation, you need to pass in ERLANG cookie secret and make them cluster by default # rabbit-secret.yaml [Secret] type: Opaque data: # echo -n "cookie-value" | base64 RABBITMQ_ERLANG_COOKIE: ... # rabbit-configmap.yaml [ConfigMap] data: enabled_plugins: | # Allow clusters, management interface and k8s support [rabbitmq_federation,rabbitmq_management,rabbitmq_peer_discovery_k8s]. rabbitmq.conf: | loopback_users.guest = false listeners.tcp.default = 5672 cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s cluster_formation.k8s.host = kubernetes.default.svc.cluster.local cluster_formation.k8s.address_type = hostname # expose each service on DNS so hostname cluster_formation.node_cleanup.only_log_warning = true Now pods must be run through a stateful set so it has persistent storage and name # rabbit-statefulset.yaml [StatefulSet] spec: serviceName: rabbitmq replicas: 4 # Can scale as required selector: matchLabels: app: rabbitmq template: metadata: labels: app: rabbitmq spec: serviceAccountName: rabbitmq # Specific user so pods can call back to k8 to see others initContainers: # Certain files need to be writable - init container to copy to rw vol - name: config image: busybox command: ['/bin/sh', '-c', 'cp /tmp/config/rabbitmq.conf /config/rabbitmq.conf \ ls -l /config/ && cp /tmp/config/enabled_plugins /etc/rabbitmq/enabled_plugins'] volumeMounts: - name: config mountPath: /tmp/config/ readOnly: false - name: config-file mountPath: /config/ - name: plugins-file mountPath: /etc/rabbitmq/ containers: - name: rabbitmq image: rabbitmq:3.8-management ports: - containerPort: 4369 # For clustering to work name: discovery - containerPort: 5672 # For clients to interact with queue name: amqp env: - name: RABBIT_POD_NAME # Get name from k8 api valueFrom: fieldRef: apiVersion: v1 fieldPath: metadata.name - name: RABBIT_POD_NAMESPACE # Get namespace from k8 api valueFrom: fieldRef: fieldPath: metadata.namespace - name: RABBITMQ_NODENAME # Define node name using k8 pod name value: rabbit@$(RABBIT_POD_NAME).rabbitmq.$(RABBIT_POD_NAMESPACE).svc.cluster.local - name: RABBITMQ_USE_LONGNAME # As you have defined a fully qualified domain name value: "true" - name: RABBITMQ_CONFIG_FILE value: "/config/rabbitmq" - name: RABBITMQ_ERLANG_COOKIE valueFrom: secretKeyRef: name: rabbit-secret key: RABBITMQ_ERLANG_COOKIE - name: K8S_HOSTNAME_SUFFIX # Define structure of hostnames value: .rabbitmq.${RABBIT_POD_NAMESPACE).svc.cluster.local volumeMounts: - name: data mountPath: /var/lib/rabbitmq readOnly: false - name: config-file mountPath: /config/ - name: plugins-file mountPath: /etc/rabbitmq/ volumes: - name: config-file emptyDir: {} - name: plugins-file emptyDir: {} - name: config configMap: name: rabbitmq-config defaultMode: 0755 volumeClaimTemplates: - metadata: name: data spec: accessModes: [ "ReadWriteOnce" ] storageClassName: "standard" resources: requests: storage: 50Mi --- # [Service] spec: clusterIP: None # Headless service - every pod gets own domain name ports: - port: 4369 targetPort: 4369 name: discovery - port: 5672 targetport: 5672 name: amqp selector: app: rabbitmq Apply all these manifests - creates pods and storage one by one kubectl -n rabbits port-forward rabbitmq-0 8080:5672 # To enable mirroring, apply the mirroring policy as before