A Glimpse Behind The Scenes At How We Perform Mass Ingest And Transformation Workflows In the Cloud

How do we orchestrate our mass ingest and transformation workflows when performing some of our largest analyses? Once data is inside Google's Cloud Platform in the right format, the combination of tools like Google Cloud Storage, Compute Engine and BigQuery make even the largest data processing workflows fairly straightforward and almost infinitely parallelizable. The real bottleneck typically lies either in ingesting an external dataset into GCP and/or in transforming a dataset from one format into another. In the past we've talked about how we use streaming relay transfers for mass ingest into GCS and large Local SSD clusters for high-IO transformations. Here's a list of the primary architectures we've found most effective in the cloud for ingest and transformation workloads:

  • Mass Ingest Via Streaming Relays. For large external datasets that we need to securely transfer into GCS for processing with GCP's AI APIs and other tools, we select the GCP data region geographically closest to the source data center (to minimize transit over commodity networking) and then spin up a cluster of 1 core high memory VMs with 10GB local disks. Each VM runs a number of fetch scripts, from a few dozen up to hundreds of threads each, depending on the data source's parallelism and bandwidth considerations (arranged in advanced and carefully monitored over the course of the transfers in coordination with the data source). A simplistic central coordination server is used that each thread polls and requests the next URL to download and then uses a simple workflow of "wget -q -O – –header '[AUTHORIZATIONTOKENSHERE]' https://someremotesite/[FILENAME] | mbuffer -q -m 50M | gsutil -q cp – gs://yourpathhere/[FILENAME]". This uses wget to securely fetch the remote resource using an authentication token, uses mbuffer to even out any irregularities in the transfer speed and uses gsutil in streaming write mode to stream the results directly to GCS in realtime. In this way, each VM is essentially acts as a relay that ingests the data over the commodity internet as physically close to the source data center as possible and then ships it via GCP's global private fiber network directly to GCS. Using gsutil in streaming mode means the data never touches local disk, so a single VM can stream hundreds of parallel streams totaling 1GB/s or more directly to GCS. In essence, the VMs act as interfaces between the data source and commodity internet and GCP's global high-speed networking and storage.
  • Mass Ingest With Edge Stream Processing. Sometimes we need to access a large external dataset to perform a relatively simple computation on the data in-flight as it arrives, such as processing a collection of multi-GB uncompressed video files to compute certain characteristics about them. If the combined transfer speed of all of the streams is less than the CPU overhead of processing them, we can use a similar model to our streaming relays, but here each VM pipes the streaming results directly to the processing utility rather than to gsutil. For example, let's say we want to run "ccextractor" across a large archive of hundreds of thousands of MPEG2 files that each stream at around 1MB/s. At that rate, ccextractor might only consume a fraction of a fraction of one percent of CPU per stream. A single VM can sustain several hundred of these streams in parallel, using a pipeline similar to "wget -q -O – –header '[AUTHORIZATIONTOKENSHERE]' https://someremotesite/[FILENAME] | mbuffer -q -m 10M | ccextractor -quiet -out=ttxt -bi – -o ./TRANSCRIPTS/[FILENAME]". Here the relays are again physically located as close as possible to the data source to minimize commodity internet transit, with the streams being processed locally on the relay. A 2GB MPEG2 might yield a 100KB output transcript, meaning that the output from the edge relay in this case that must be shipped to GCS is extremely small. Due to the very small size of the output files, here we don't use streaming gsutil writes, but rather write locally to disk and then ship, minimizing the number of parallel GCS writes and thus conserving all available memory in this case.
  • Mass Linear Transformations. In many workflows the data we need is already onsite in GCS, but we must transform it from one format to another, such as parsing a complex nonstandard file format to extract a particular attribute from each record. In cases where processing proceeds linearly through each file from start to finish and the files themselves are relatively large, we use a GCS->GCE->GCS workflow in which each VM opens a file from GCS using "gsutil cat" to stream it in, processes it on-the-fly and writes the results back to GCS using "gsutil" in streaming mode. Thus, a workflow might look like "gsutil cat gs://yourpathhere/[INPUTFILE] | mbuffer -q -m 50M | processingtool | mbuffer -q -m 50M | gsutil -q cp – gs://yourpathhere/[OUTPUTFILE]". Once again, no local disk is required and processing proceeds at the speed of the local CPUs, with the speed to/from GCS automatically adjusting in realtime.
  • Mass Random Large Dataset Transformations. In some transformation workflows the nature of the data or transformation process requires very high random access across a large dataset, necessitating an extremely high IOP storage system. If the dataset is a few TB's in size, one option is to construct a Local SSD storage array, which can sustain extremely high IOPs, though is limited to datasets that can fit in the available Local SSD disks. At present, a single VM can mount 24 Local SSD's into a single 9TB storage array with 2.4 million read / 1.2 million write IOPs and 9.36GB/s read / 4.68GB/s write performance. Of course, this can be multiplied across an entire cluster if the dataset can be split across machines, yielding linear scaling.
  • Mass Random Medium Dataset Transformations. In some transformation workloads, the entire dataset is small enough to fit into memory on large-memory VM's, enabling the use of ramdisks for extreme IO. Given that 200GB RAM VM's are routine in GCE and up to 11.7TB SSI systems are currently available, many workloads will find ramdisks feasible within GCE. In our case, our workflow involved a set of VM's with 200GB of RAM, 100GB of which was carved into a ramdisk and the remaining 100GB used for computation. We used gsutil to stream the datasets from GCS directly to the ramdisk in parallel, then processed the dataset, using heavy memory caching and massively random access over the data, writing our results using heavy random IO, with the final results streamed from the ramdisk directly to GCS. In this case, a 200GB RAM VM can be spun up and spun down instantly, meaning even a cluster of large-memory VM's can be spun up for just the precise amount of time needed to process the dataset and then instantly decommissioned, with the ramdisk speed allowing orders of magnitude faster turnaround times for medium-sized datasets that will fit memory-resident.

We hope these workflows are helpful and offer a glimpse of how to perform optimized large-scale ingest and transformation workflows in GCP!