I have a search service that writes the frequency of queries for the last 1 min to Cassandra as shown below. Now I need to aggregate the frequency for the last 1 hr i.e., calculate sum of frequency for each query in the last 1 hr. Assume that the number of unique queries in 1 hr window is 5 million. Now, I have three aggregator nodes that read from cassandra, do the aggregation and write it back to a different table. But I am not sure how to distribute the workload among the aggregator nodes i.e., how to coordinate the work? Each record should be processed by only one node. If it's sql, may be I can use transactions to atomically read a few hundred records and mark them as being processed. But I don't know how to achieve this in cassandra. This appears to be a common problem that will be useful in multiple systems like calculating product sales rank by product category in a e-commerce application, frequency count of all youtube videos watched in the last hr etc.
Is this for interview prep? Why not update the frequency at query time instead (asynchronously) so the numbers are already ready for you when you need them? In that case count-min sketch would be good as the above poster said. If you don’t want to do it at query time for some reason, that makes me think this should be done in the data warehouse.
This is for interview prep. By query time you mean the search service maintains this min sketch? If so, we need a separate min sketch for each hour right? Also, other downstream systems need to know the unique queries that happened in the 1hr window so that they can generate (query, count) tuple. What’s the best way to maintain this unique queries data? —- As you said, search service can write to DFS and we can have map reduce to slice and reduce.
I was thinking that the search service sends an asynchronous update (maybe via Kafka) to the count service after handling each search request. Then the separate count service would maintain the sketch. Regarding time intervals, if you don’t need a rolling 1 hr window, you could just save the sketch every hour and make a new blank one.
The count min sketch doesn’t lend itself very well to generating a list of entries, because it’s just based on hash functions and doesn’t store the keys explicitly. If you want to send query count tuples downstream, you’d need the set of queries made in the hour in order to ask the sketch for each. If you’re storing such a set, that’s already O(n) space, so it may be better to just have a distributed hash map of counts instead of a sketch. Especially because the hash map wouldn’t have the sketch’s probabilistic inaccuracies. Consistent hashing for the sharding of the hash map would probably be a good idea in case of load spikes. Sorry if any of this is obvious 😅
Can’t he make a CMS for every hour Airbnb?
I would actually either make a CMS and then dump results every hour and reset if you don’t need rolling updates. If you need rolling updates the problem gets more interesting
I’m not sure how to do a sliding 1 hour window. Would something like this work? — Every time a query is processed, we increment it in the distributed hash map, and also schedule 1 hour from now a decrement of the same key in the hash map. When a count in the hash map reaches 0, remove the corresponding query. Then the hash map should always be up to date and snapshot-able.
If that does work, you probably don’t even need an async job framework to schedule the decrement. You could just reuse the Kafka queue of queries from a different offset, and apply decrements from it instead of increments. Since reading a message in Kafka isn’t destructive.
For rolling time updates - make a webservice that looks up your cms and add a rolling timestamp count . Upstream service can trigger this service or actually send request to a kafka queue.
Can you elaborate on how the rolling timestamp count works?
It would need additional hash for ingestion-time: query_count . If you need to calculate frequency in arbitrary hours in the past, this hash will be bigger. Typically hash will purge after x hours and this is a heavy hitters problem. You could store this outside of db in a cheaper storage
Nvm i think you are right airbnb- have to reset on read. Cant use O<n> additional storage ( unless db cannot handle more writes due to heavy load )
Can we do exponential growth rate estimator?
If the queryID is the partition key and you have a partition key to denote the time 1hr for those entries, can you count the list size for 1 hr? For these use cases, I would prefer Redis than Cassandra.
Running Aggregations in flink and write to c*
2024 Presidential Election
17h
1225
Uh oh: President Trump leads Biden 49% to 43% in a two-way race.
Cars
Yesterday
1967
Cyber truck killer: Chinese version of EV truck
Tech Industry
17h
274
Working on a project you don’t like or working with someone who you hate?
Health & Wellness
14h
890
Issues with sleep
Ask Blinders
20h
935
Why Pronouns shit captured US ? I don’t see this anywhere else
Count min sketch, look up splunk system design too