The Yahoo Streaming Benchmark is fairly popular but, I will argue, an exceedingly ill-named benchmark. Although it sets out to measure what a stream processor might be capable of, it ends up measuring just about everything other than the stream processor. I'll provide some data backing this up. A laptop will be involved.
If you aren't familiar with the benchmark, this matters mostly because papers use it and reach conclusions that may not be as well supported as they hope. This isn't great for the papers, nor for the stream processing research area generally.
Edit: Comments from the Drizzle authors in the Drizzle section.
The benchmark models a simple ad account environment, where events describing ad views enter the system and those of a certain type are accounted to their associated campaign. Every ten seconds the stream processor is expected to report the total ads for each campaign within those ten seconds. The benchmark as initially defined brings records in as JSON by way of a Kafka queue, and has the stream processor consult Redis to determine the mapping from advertisement id to campaign id, and as the recipient of the results of the streaming computation.
The computation is therefore roughly: i. read JSON from Kafka, ii. filter down to events where the event_type
field is purchase
, iii. look up the ad_id
in a Redis table to determine a campaign_id
, iv. accumulate counts for each campaign id, and v. at 10 second boundaries emit the aggregates to Redis.
Yahoo has a blog post describing the benchmark and the executive summary:
Executive Summary - Due to a lack of real-world streaming benchmarks, we developed one to compare Apache Flink, Apache Storm and Apache Spark Streaming. Storm 0.10.0, 0.11.0-SNAPSHOT and Flink 0.10.1 show sub-second latencies at relatively high throughputs with Storm having the lowest 99th percentile latency. Spark streaming 1.5.1 supports high throughputs, but at a relatively higher latency.
I am not an executive, and so was unwilling to accept this summary. Also, "sub-second latencies"? "Relatively high throughputs"? Surely we can learn more about this benchmark.
This learning is impaired by rather bad figures. This is an actual figure (not a screenshot) from the post summarizing their findings (credit, as appropriate, to Yahoo):
If you want to avoid the eyestrain, the x-axis is fraction of tuples deemed "complete" at various milliseconds along the y-axis, under 150,000 tuples per second offered load. Both Storm and Flink complete their work in about a second, at which point Spark appears to start completing its work.
Don't read too much into these results. It turns out they are benchmarking pretty much everything except the underlying stream processors.
There are several moving parts inside the benchmark that are not the stream processor, and over the course of a few blog posts the teams behind Apaches Flink and Spark deconstructed the YSB, repeatedly identifying the bottlenecks and voting them out of the benchmark.
It turns out that doing a call into Redis for each record is not a brilliant idea. You don't actually have to do that (you could wait for the 10 second tick and then a call for each distinct ad id), but apparently everyone was, and it was slow. All of these stream processors (at least Flink and Spark) have native join
operators, and could just join the (ad_id, campaign_id)
within the stream processor. Or, Spark can do this and it isn't clear if Flink yet can (it involves joining a windowed relation against a non-windowed relation), but both systems can certainly just sit on a 1,000 element map from ad_id
to campaign_id
, because that is all of 32 kilobytes (there are only 1,000 ad ids in the benchmark).
And so Redis got voted out of the benchmark.
Kafka is where the input data come from, so it is a pretty important part of the process. It is also where the data are initially in textual JSON representation with a bunch of fields that aren't really all that important to the benchmark. Things like user ids, ad types, ip addresses, all of which get discarded pretty much immediately.
Data Artisans (the Flink folks) describe in a post how they found that bandwidth from their 10 node Kafka cluster to their 10 node Flink cluster was the next bottleneck. Their topology isn't exactly clear, but part of the problem seems to be that while they use 10 gigabit network links between their compute nodes, they have only a 1 gigabit connection between the Kafka and Flink clusters. This bottlenecks the Flink computation at 3 million records per second, just because they can't get the data out of their Kafka cluster any faster than this. It may also suggest that perhaps their next upgrade should be their network connection next rather than adding that 11th Kafka node.
In any case, the post describes how they move the data generation out of Kafka and into the computation itself. They still behave as if they had received data from Kafka, but can now see the performance of the stream processor more clearly.
At this point, for reference, their 10 node cluster (40 cores total) are able to process 15 million events per second. That is 100x greater than the initial "relatively high throughput".
Last October Databricks (the Spark folks) had their own blog post in which they compare Spark and Flink. If you take a read, it is all a bit complicated because apparently they botched the Flink implementation (performing random access to a length 100 linked list, rather than an array; read about it here). They've updated the post, but I'm not entirely sure which data are corrected and which are not.
But, one change they did make was to shift away from producing and deserializing JSON strings (at least, the corresponding Flink implemenation has; the Spark code seems to be walled up). It turns out JSON deserialization is relatively expensive, and when you dispose of it you can crank single-core performance up to anywhere between 1.6 million (DB does Flink) to 2.5 million (DB does Spark) to 4.0 million (dA does Flink) records per second. The DataBricks folks bring 10 machines (40 cores) to bear and bring this up to 60 million records per second! Wooo!
This rate, 60 million records per second, is now even higher than the "relatively high throughputs" offered by the executive summary: 150,000 records per second. We may even be measuring the stream processors themselves, rather than Yahoo's performance-agnostic infrastructural decisions.
So if we are measuring stream processors, how do they stack up? Are the results actually good, and we should be delighted? Alternately, can we stop the (entertaining, for sure) back and forth between Spark and Flink by landing some extinction level event performance numbers? What system could possibly precipitate such a cataclysm?
Apropos nothing in particular, yesterday I implemented the modified YSB, with in-system non-JSON data generation and neither Kafka nor Redis, in differential dataflow. I've been running it single-threaded while I've been writing this post! The memory consumption is stable at 2.1MB, and the compute hasn't interfered with the bad-ass music I've been listening to (currently: Snap's "The Power").
The differential dataflow fragment looks like this (links
is the (ad, campaign)
collection, views
is the set of views with the filtered event type at the indicated time):
let probe =
links
.semijoin(&views)
.map(|(_ad, campaign)| campaign)
.consolidate()
.inspect(move |x| if inspect { println!("{:?}", x); })
.probe();
This doesn't look like much, and indeed it isn't. It turn out that if you want tumbling window counts it is super easy in differential dataflow, as it reports for each time (here: window) by how much the counts have changed, which is just the sum for the window. If you want to see the results it prints them out for you; it could put them in Redis if you wanted that instead.
Following the recent Drizzle paper I discarded the first five minutes of measurements and checked out the next five minutes. Every ten seconds I report the throughput as the number of records processed divided by ten, and the latency as the time after the ten seconds ends that timely returns the signal that all of the 100 counts have been correctly determined. This means I have 30 throughput measurements and 30 latency measurements, which are:
So, this looks like roughly 35 million events per second, and latencies that are pretty much 300 microseconds (except for one screw-up).
Does this make any sense? Sure. I mean, what do you have to do for each record? You have to consult a field to determine if you should keep the record, consult a field to see what the timestamp should be, and then insert the ad id with that timestamp. The next step is a differential join
, and before executing anything differential accumulates its inputs, which it represents as multisets. There are 1,000 distinct ad identifiers (a plausible number for Yahoo's ad business) and after sorting each batch we just fold these results in to the totals. Once a window closes we dump the 1,000 results at join and consolidate the resulting campaign ids. It averages out to about 30ns per record, apparently.
I should stress that I'm working pretty hard to avoid doing any work in generating the data. Like the recent benchmark implementations I'm just round-robining through randomly generated ad identifiers and event types (1,000 and 3 are relatively prime). One could complain that I'm doing too little work here (feel free), but we really aren't supposed to be measuring anything other than the streaming computation, right?
Just this past year at SOSP Berkeley had a paper on Drizzle, a way to slice up Spark computations more finely to the point that maybe they start to look like streaming computations. The paper is not fundamentally about the Yahoo Streaming Benchmark, but they use it as their main metric for assessing the improvements they make over vanilla Spark and Flink. One of the goals of the paper (though not the only one) is to improve latency, and in that effort they arrive at the following result (from their abstract):
Our experiments on a 128 node EC2 cluster show that on the Yahoo Streaming Benchmark, Drizzle can achieve end-to-end record processing latencies of less than 100ms and can get 2–3x lower latency than Spark.
Here's the figure that drives home the point:
The Drizzle-optimized implementation can process 20 million events per second with only 100ms latencies (using a workload-specific optimization), using just 128 computers and 512 cores.
Wait, 128 computers? 512 cores? Don't we get better results with just one laptop core in differential dataflow?
The Drizzle implementation of YSB is, to my understanding, still parsing JSON (though using neither Kafka nor Redis). In my experiments, JSON parsing represents a roughly 35x slowdown (I can parse YSB JSON records at about 900,000/s using pikkr, a Rust port of mison from MSR). This explains the overhead, as well as the gap between these numbers and DataBricks' own report of 60 million records per second on just ten machines.
Knowing only this (and subject to me being corrected) the Drizzle evaluation looks to be less about low-latency stream processing, and more an evaluation on a massively parallel JSON deserialization task. The stream processing component, retiring the 20 million records per second, needs at most one core (and some sweet tunes) and you get a 300x reduction in latency as a bonus (probably in no small part due to not using so many machines, Spark). It's actually a relatively easy streaming computation, not one that needs such a scary system unless you build in JSON deserialization as a requirement.
As a thought experiment: Take a read through the Drizzle paper and try to determine which conclusions about system design would still hold without the overheads of JSON deserialization. I don't know that any of the conclusions are wrong (JSON deserialization may be a great stand-in for "non-trivial compute"), but the overhead of deserialization really seems to obscure the actual systems bottlenecks. If the world shifts tomorrow and everyone starts using Abomonation, would any of the Drizzle work need to be rolled back and reconsidered? What about the people already using binary formats (e.g. the current Spark and Flink YSB implementations)?
Edit: I failed to point out (but should have done) that the Drizzle paper provides additional datapoints where they achieve higher throughputs at the expense of more latency. When they allow themselves a one second latency, for example, their optimized aggregate throughput increases to roughly 110 million elements per second (informally, the optimization accumulates counts in-place before exchanging them, for an amount of time that increases with their target latency).
The Flink folks recently called for an end to the Yahoo Streaming Benchmark. I'm down with that. If nothing else, let's remove the JSON aspect if we want to evaluate stream processors. Leave the JSON in if you want to evaluate JSON deserializers.
Here's a sassier position: If you are using JSON or text-based serialization formats, be careful talking about performance. Please. It's really hard to trust your conclusions when there is a self-inflicted 35x performance overhead. Learn about extensible binary formats like CapnProto; earn the respect of your peers; stop paying Amazon to boil the oceans for us.
We won't actually learn about stream processors until we start evaluating the stream processor itself. Let's start.