Skip to content

[ML] Frequent Items: use a bitset for deduplication #88943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/88943.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88943
summary: "Frequent Items: use a bitset for deduplication"
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

package org.elasticsearch.xpack.ml.aggs.frequentitemsets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.LongsRef;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.TransactionStore.TopItemIds;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -30,6 +32,7 @@
* if [a, b] is not in T, [a, b, c] can not be in T either
*/
class CountingItemSetTraverser implements Releasable {
private static final Logger logger = LogManager.getLogger(CountingItemSetTraverser.class);

// start size and size increment for the occurences stack
private static final int OCCURENCES_SIZE_INCREMENT = 10;
Expand All @@ -48,13 +51,19 @@ class CountingItemSetTraverser implements Releasable {
// growable bit set from java util
private java.util.BitSet visited;

CountingItemSetTraverser(TransactionStore transactionStore, int cacheTraversalDepth, int cacheNumberOfTransactions, long minCount) {
CountingItemSetTraverser(
TransactionStore transactionStore,
TopItemIds topItemIds,
int cacheTraversalDepth,
int cacheNumberOfTransactions,
long minCount
) {
this.transactionStore = transactionStore;

boolean success = false;
try {
// we allocate 2 big arrays, if the 2nd allocation fails, ensure we clean up
this.topItemSetTraverser = transactionStore.getTopItemIdTraverser();
this.topItemSetTraverser = new ItemSetTraverser(topItemIds);
this.topTransactionIds = transactionStore.getTopTransactionIds();
success = true;
} finally {
Expand All @@ -80,11 +89,15 @@ public boolean next(long earlyStopMinCount) throws IOException {
final long totalTransactionCount = transactionStore.getTotalTransactionCount();

int depth = topItemSetTraverser.getNumberOfItems();
long occurencesOfSingleItem = transactionStore.getItemCount(topItemSetTraverser.getItemId());

if (depth == 1) {
// at the 1st level, we can take the count directly from the transaction store
occurencesStack[0] = transactionStore.getItemCount(topItemSetTraverser.getItemId());
occurencesStack[0] = occurencesOfSingleItem;
return true;
} else if (occurencesOfSingleItem < earlyStopMinCount) {
rememberCountInStack(depth, occurencesOfSingleItem);
return true;

// till a certain depth store results in a cache matrix
} else if (depth < cacheTraversalDepth) {
// get the cached skip count
Expand Down Expand Up @@ -187,7 +200,7 @@ public long getCount() {
/**
* Get the count of the item set without the last item
*/
public long getPreviousCount() {
public long getParentCount() {
if (topItemSetTraverser.getNumberOfItems() > 1) {
return occurencesStack[topItemSetTraverser.getNumberOfItems() - 2];
}
Expand All @@ -201,7 +214,7 @@ public boolean hasBeenVisited() {
return true;
}

public boolean hasPredecessorBeenVisited() {
public boolean hasParentBeenVisited() {
if (topItemSetTraverser.getNumberOfItems() > 1) {
return visited.get(topItemSetTraverser.getNumberOfItems() - 2);
}
Expand All @@ -214,7 +227,7 @@ public void setVisited() {
}
}

public void setPredecessorVisited() {
public void setParentVisited() {
if (topItemSetTraverser.getNumberOfItems() > 1) {
visited.set(topItemSetTraverser.getNumberOfItems() - 2);
}
Expand All @@ -228,10 +241,15 @@ public int getNumberOfItems() {
}

/**
* Get the current item set
*
* Get a bitset representation of the current item set
*/
public LongsRef getItemSet() {
return topItemSetTraverser.getItemSet();
public ItemSetBitSet getItemSetBitSet() {
return topItemSetTraverser.getItemSetBitSet();
}

public ItemSetBitSet getParentItemSetBitSet() {
return topItemSetTraverser.getParentItemSetBitSet();
}

/**
Expand All @@ -250,7 +268,7 @@ public boolean atLeaf() {

@Override
public void close() {
Releasables.close(topItemSetTraverser, topTransactionIds);
Releasables.close(topTransactionIds);
}

// remember the count in the stack without tracking push and pop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.LongsRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -25,6 +24,7 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.FrequentItemSetCollector.FrequentItemSet;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.TransactionStore.TopItemIds;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.mr.AbstractItemSetMapReducer;
import org.elasticsearch.xpack.ml.aggs.frequentitemsets.mr.ItemSetMapReduceValueSource.Field;

Expand Down Expand Up @@ -338,17 +338,17 @@ private static EclatResult eclat(
final long totalTransactionCount = transactionStore.getTotalTransactionCount();
Map<String, Object> profilingInfo = null;
long minCount = (long) Math.ceil(totalTransactionCount * minimumSupport);
FrequentItemSetCollector collector = new FrequentItemSetCollector(transactionStore, size, minCount);
long numberOfSetsChecked = 0;

if (profilingInfoReduce != null) {
profilingInfo = new LinkedHashMap<>(profilingInfoReduce);
profilingInfo.put("start_min_count_eclat", minCount);
}

try (
TopItemIds topItemIds = transactionStore.getTopItemIds();
CountingItemSetTraverser setTraverser = new CountingItemSetTraverser(
transactionStore,
topItemIds,
BITSET_CACHE_TRAVERSAL_DEPTH,
(int) Math.min(MAX_BITSET_CACHE_NUMBER_OF_TRANSACTIONS, totalTransactionCount),
minCount
Expand All @@ -360,7 +360,8 @@ private static EclatResult eclat(
minCount,
transactionStore.getTotalItemCount()
);

FrequentItemSetCollector collector = new FrequentItemSetCollector(transactionStore, topItemIds, size, minCount);
long numberOfSetsChecked = 0;
long previousMinCount = 0;

while (setTraverser.next(minCount)) {
Expand Down Expand Up @@ -402,8 +403,11 @@ private static EclatResult eclat(
if (setTraverser.atLeaf()
&& setTraverser.hasBeenVisited() == false
&& setTraverser.getCount() >= minCount
&& setTraverser.getItemSet().length >= minimumSetSize) {
minCount = collector.add(setTraverser.getItemSet(), setTraverser.getCount());
&& setTraverser.getItemSetBitSet().cardinality() >= minimumSetSize) {

logger.trace("add after prune");

minCount = collector.add(setTraverser.getItemSetBitSet(), setTraverser.getCount());
// no need to set visited, as we are on a leaf
}

Expand All @@ -418,19 +422,17 @@ private static EclatResult eclat(
*
* iff the count of the subset is higher, collect
*/
if (setTraverser.hasPredecessorBeenVisited() == false
&& setTraverser.getItemSet().length > minimumSetSize
&& setTraverser.getCount() < setTraverser.getPreviousCount()) {
if (setTraverser.hasParentBeenVisited() == false
&& setTraverser.getItemSetBitSet().cardinality() > minimumSetSize
&& setTraverser.getCount() < setTraverser.getParentCount()) {
// add the set without the last item

LongsRef subItemSet = setTraverser.getItemSet().clone();
subItemSet.length--;
minCount = collector.add(subItemSet, setTraverser.getPreviousCount());
minCount = collector.add(setTraverser.getParentItemSetBitSet(), setTraverser.getParentCount());
}

// closed set criteria: the predecessor is no longer of interest: either we reported in the previous step or we found a
// super set
setTraverser.setPredecessorVisited();
setTraverser.setParentVisited();

/**
* Iff the traverser reached a leaf, the item set can not be further expanded, e.g. we reached [f]:
Expand All @@ -445,8 +447,8 @@ private static EclatResult eclat(
*
* Note: this also covers the last item, e.g. [a, x, y]
*/
if (setTraverser.atLeaf() && setTraverser.getItemSet().length >= minimumSetSize) {
minCount = collector.add(setTraverser.getItemSet(), setTraverser.getCount());
if (setTraverser.atLeaf() && setTraverser.getItemSetBitSet().cardinality() >= minimumSetSize) {
minCount = collector.add(setTraverser.getItemSetBitSet(), setTraverser.getCount());
// no need to set visited, as we are on a leaf
}

Expand Down
Loading