The GDELT Project

Simple Experiments In Scalable Queuing: Quickly Getting Started With GCP's Pub/Sub

For those interested in the scalable global queuing system that is GCP's Pub/Sub, but not sure where to start, here's a trivial getting started guide to help you experiment with it.

First, create a "topic" (essentially the queue that we write entries to):

gcloud pubsub topics create topic1

Now we create two "subscribers" against this topic. While records are typically only delivered once (no duplicates), here we ask that this be strictly enforced for sub1:

gcloud pubsub subscriptions create sub1 --topic=topic1 --enable-exactly-once-delivery
gcloud pubsub subscriptions create sub2 --topic=topic1

We can verify details of a subscription:

gcloud pubsub subscriptions describe sub1

While the "topic" is our queue, we can have multiple "subscribers" that are essentially independent mirrors of the queue. When an item is written to a topic, it appears in every subscriber that is subscribed to that topic. But, these subscribers are independent queues meaning that when an item is pulled from one subscriber it remains in the others. In other words, let's say we write an item to topic1 above. Now we "pull" the next available record from sub1 above: we get our item. If we pull from sub1 again, we get no records, since the queue is empty. But, the item remains in sub2, so now if we pull from sub2 we get the item.

Why might this be useful? Imagine we have an ingest server that is streaming in video files from an outside source. We have multiple processes we want to perform on each video: Chirp ASR, Video AI API for OCR and visual assessment, Gemini Pro Vision on the opening minute of the clip, etc. To do this, as each MP4 file is written to GCS, we write its path to topic1. Then we have three subscribers "sub-chirp", "sub-videoapi", "sub-gemini". As each video is written to topic1, it becomes available to all three subscriber queues and each can pull the videos from the queue at their leisure. So, if we write 100 videos to topic1 and Chirp runs really quickly, it might exhaust the list of videos, while Gemini takes much longer to work its way through the queue. In this scenario that is all transparently taken care of for us.

Now let's try writing a set of videos to our topic:

time gcloud pubsub topics publish topic1 --message="{ 'video': 'gs://yourbucket/yourvideo1.mp4'} " --format="json"
time gcloud pubsub topics publish topic1 --message="{ 'video': 'gs://yourbucket/yourvideo2.mp4'} " --format="json"
time gcloud pubsub topics publish topic1 --message="{ 'video': 'gs://yourbucket/yourvideo3.mp4'} " --format="json"
time gcloud pubsub topics publish topic1 --message="{ 'video': 'gs://yourbucket/yourvideo4.mp4'} " --format="json"

Now we'll read the first video from the sub1 and tell it to automatically remove the record from the queue (rather than waiting for us to process the video and confirm that we were successful). By default, when you pull a message from the queue, you have to acknowledge it when you're done processing it, otherwise after a period of time it is released back to the queue, to make it trivial to build fault-tolerate systems:

time gcloud pubsub subscriptions pull sub1 --auto-ack --format="json(ackId, message.attributes, message.data.decode(\"base64\").decode(\"utf-8\"), message.messageId, message.publishTime)"

This takes around 1-2 seconds and yields:

[
  {
    "ackId": "XYZ...",
    "message": {
      "data": "{\"video\":\"gs://yourbucket/yourvideo1.mp4\"}",
      "messageId": "123...",
      "publishTime": "2024-01-16T19:13:22.705Z"
    }
  }
]

Now let's request the next two records from sub1:

time gcloud pubsub subscriptions pull sub1 --auto-ack --limit 2 --format="json(ackId, message.attributes, message.data.decode(\"base64\").decode(\"utf-8\"), message.messageId, message.publishTime)"

This yields:

[
  {
    "ackId": "XYZ...",
    "message": {
      "data": "{\"video\":\"gs://yourbucket/yourvideo2.mp4\"}",
      "messageId": "123...",
      "publishTime": "2024-01-16T19:13:22.805Z"
    }
   {
    "ackId": "ABC...",
    "message": {
      "data": "{\"video\":\"gs://yourbucket/yourvideo3.mp4\"}",
      "messageId": "124...",
      "publishTime": "2024-01-16T19:13:22.905Z"
     },
]

What about our second subscription? Let's pull the first record from it.

time gcloud pubsub subscriptions pull sub2 --auto-ack --format="json(ackId, message.attributes, message.data.decode(\"base64\").decode(\"utf-8\"), message.messageId, message.publishTime)"

This yields the following. Note that this is the first video. While we already pulled this from sub1, remember that sub2 gets its own copy of all of the records, so video 1 still exists in this queue:

[
  {
    "ackId": "XYZ...",
    "message": {
      "data": "{\"video\":\"gs://yourbucket/yourvideo1.mp4\"}",
      "messageId": "123...",
      "publishTime": "2024-01-16T19:13:22.705Z"
    }
  }
]

Using the CLI to fetch a single record at a time is cumbersome. What if we use the simple Python script from the Pub/Sub demo?

First install the necessary Python libraries:

pip install google
pip install google-api-core
pip install google-cloud-pubsub

Then write the following script to pubsub.py:

vi pubsub.py
:set paste
python3 ./pubsub.py

The script:

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "yourgcpprojectid"
# subscription_id = "sub1"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

This may take a while to start returning messages, but eventually you'll get a stream of records as they are written.

Note if you get errors like the following, it is nothing to worry about and just means multiple clients are contending for the same records in the same subscription and Pub/Sub is correctly handing out only one copy to each. For example, if you have multiple processes all reading videos from the queue and submitting to Chirp, they might each get the same record under certain circumstances which Pub/Sub then recognizes and tells one of the processes to ignore the record:

google.cloud.pubsub_v1.subscriber.exceptions.AcknowledgeError: None AcknowledgeStatus.INVALID_ACK_ID
AcknowledgeError when lease-modacking a message.

When you're done you can clean up via:

gcloud pubsub topics delete topic1
gcloud pubsub subscriptions delete sub1 
gcloud pubsub subscriptions delete sub2

Its quite literally that simple. The underlying service is entirely managed, meaning you simply read and write to your queues and let GCP handle the rest. Queues can be global, meaning you can have multiple VMs in multiple GCP regions across the world pushing records to the queue and pulling records from the queue and Pub/Sub handles everything for you.

While Pub/Sub is exceptionally powerful, it is geared towards high-volume applications that can handle batches of records at a time, rather than fine-grained single-record sharding. In other words, if your application is processing millions of records in batches, Pub/Sub is an ideal fit. In contrast, if you were running a dozen GPU VMs and had a transcoder running on each that pulled a GCS video filename from the queue, processed it and then requested the next video, you'll end up with a lot of null pull requests, even when there are items on the queue. In other words, even if the queue is full and you perform a pull request, you may or may not receive any results – you could issue a dozen pull requests in a row and get null each time. If you run a streaming pull script (like the one above) on multiple VMs, the first run to start will receive a steady stream of records, but if you subsequently start copies on a dozen other VMs, they will be starved and will receive no records except every minute or so for a brief burst. This is a byproduct of Pub/Sub's design: it is purpose built for massive-scale queue needs, rather than fine-precision single-record round-robin sharding.