Skip to content

Commit b4a5356

Browse files
Update user lifecycle tracking to V3
1 parent 90899e0 commit b4a5356

File tree

20 files changed

+910
-637
lines changed

20 files changed

+910
-637
lines changed

dd-java-agent/appsec/src/main/java/com/datadog/appsec/AppSecSystem.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.datadog.appsec.event.ReplaceableEventProducerService;
99
import com.datadog.appsec.gateway.GatewayBridge;
1010
import com.datadog.appsec.powerwaf.PowerWAFModule;
11-
import com.datadog.appsec.user.AppSecEventTrackerImpl;
1211
import com.datadog.appsec.util.AbortStartupException;
1312
import com.datadog.appsec.util.StandardizedLogging;
1413
import datadog.appsec.api.blocking.Blocking;
@@ -99,7 +98,7 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
9998

10099
Blocking.setBlockingService(new BlockingServiceImpl(REPLACEABLE_EVENT_PRODUCER));
101100

102-
AppSecEventTracker.setEventTracker(new AppSecEventTrackerImpl());
101+
AppSecEventTracker.setEventTracker(new AppSecEventTracker());
103102

104103
STARTED.set(true);
105104

dd-java-agent/appsec/src/main/java/com/datadog/appsec/event/data/KnownAddresses.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ public interface KnownAddresses {
111111

112112
Address<String> USER_ID = new Address<>("usr.id");
113113

114+
Address<String> USER_LOGIN = new Address<>("usr.login");
115+
114116
Address<String> SESSION_ID = new Address<>("usr.session_id");
115117

116118
/** The URL of a network resource being requested (outgoing request) */
@@ -195,6 +197,8 @@ static Address<?> forName(String name) {
195197
return SERVER_GRAPHQL_ALL_RESOLVERS;
196198
case "usr.id":
197199
return USER_ID;
200+
case "usr.login":
201+
return USER_LOGIN;
198202
case "usr.session_id":
199203
return SESSION_ID;
200204
case "server.io.net.url":

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.datadog.appsec.report.AppSecEvent;
99
import com.datadog.appsec.util.StandardizedLogging;
1010
import datadog.trace.api.Config;
11+
import datadog.trace.api.UserIdCollectionMode;
1112
import datadog.trace.api.http.StoredBodySupplier;
1213
import datadog.trace.api.internal.TraceSegment;
1314
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
@@ -125,6 +126,10 @@ public class AppSecRequestContext implements DataBundle, Closeable {
125126

126127
// keep a reference to the last published usr.id
127128
private volatile String userId;
129+
private volatile UserIdCollectionMode userIdSource;
130+
// keep a reference to the last published usr.login
131+
private volatile String userLogin;
132+
private volatile UserIdCollectionMode userLoginSource;
128133
// keep a reference to the last published usr.session_id
129134
private volatile String sessionId;
130135

@@ -435,6 +440,30 @@ public void setUserId(String userId) {
435440
this.userId = userId;
436441
}
437442

443+
public UserIdCollectionMode getUserIdSource() {
444+
return userIdSource;
445+
}
446+
447+
public void setUserIdSource(UserIdCollectionMode userIdSource) {
448+
this.userIdSource = userIdSource;
449+
}
450+
451+
public String getUserLogin() {
452+
return userLogin;
453+
}
454+
455+
public void setUserLogin(String userLogin) {
456+
this.userLogin = userLogin;
457+
}
458+
459+
public UserIdCollectionMode getUserLoginSource() {
460+
return userLoginSource;
461+
}
462+
463+
public void setUserLoginSource(UserIdCollectionMode userLoginSource) {
464+
this.userLoginSource = userLoginSource;
465+
}
466+
438467
public void setSessionId(String sessionId) {
439468
this.sessionId = sessionId;
440469
}

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java

Lines changed: 189 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
package com.datadog.appsec.gateway;
22

33
import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_0_2;
4+
import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_3_4;
45
import static com.datadog.appsec.event.data.MapDataBundle.Builder.CAPACITY_6_10;
56
import static com.datadog.appsec.gateway.AppSecRequestContext.DEFAULT_REQUEST_HEADERS_ALLOW_LIST;
67
import static com.datadog.appsec.gateway.AppSecRequestContext.REQUEST_HEADERS_ALLOW_LIST;
78
import static com.datadog.appsec.gateway.AppSecRequestContext.RESPONSE_HEADERS_ALLOW_LIST;
9+
import static datadog.trace.api.UserIdCollectionMode.ANONYMIZATION;
10+
import static datadog.trace.api.UserIdCollectionMode.DISABLED;
11+
import static datadog.trace.api.UserIdCollectionMode.SDK;
12+
import static datadog.trace.api.telemetry.LogCollector.SEND_TELEMETRY;
13+
import static datadog.trace.util.Strings.toHexString;
814

915
import com.datadog.appsec.AppSecSystem;
1016
import com.datadog.appsec.api.security.ApiSecurityRequestSampler;
@@ -22,7 +28,6 @@
2228
import com.datadog.appsec.report.AppSecEventWrapper;
2329
import datadog.trace.api.Config;
2430
import datadog.trace.api.UserIdCollectionMode;
25-
import datadog.trace.api.function.TriFunction;
2631
import datadog.trace.api.gateway.Events;
2732
import datadog.trace.api.gateway.Flow;
2833
import datadog.trace.api.gateway.IGSpanInfo;
@@ -41,6 +46,8 @@
4146
import java.net.URISyntaxException;
4247
import java.nio.charset.Charset;
4348
import java.nio.charset.StandardCharsets;
49+
import java.security.MessageDigest;
50+
import java.security.NoSuchAlgorithmException;
4451
import java.util.ArrayList;
4552
import java.util.Arrays;
4653
import java.util.Collection;
@@ -51,7 +58,9 @@
5158
import java.util.Map;
5259
import java.util.Set;
5360
import java.util.concurrent.ConcurrentHashMap;
61+
import java.util.concurrent.atomic.AtomicBoolean;
5462
import java.util.regex.Pattern;
63+
import java.util.stream.Collectors;
5564
import org.slf4j.Logger;
5665
import org.slf4j.LoggerFactory;
5766

@@ -65,6 +74,10 @@ public class GatewayBridge {
6574
private static final Pattern QUERY_PARAM_SPLITTER = Pattern.compile("&");
6675
private static final Map<String, List<String>> EMPTY_QUERY_PARAMS = Collections.emptyMap();
6776

77+
private static final int HASH_SIZE_BYTES = 16; // 128 bits
78+
private static final String ANON_PREFIX = "anon_";
79+
private static final AtomicBoolean SHA_MISSING_REPORTED = new AtomicBoolean(false);
80+
6881
/** User tracking tags that will force the collection of request headers */
6982
private static final String[] USER_TRACKING_TAGS = {
7083
"appsec.events.users.login.success.track", "appsec.events.users.login.failure.track"
@@ -91,7 +104,8 @@ public class GatewayBridge {
91104
private volatile DataSubscriberInfo ioNetUrlSubInfo;
92105
private volatile DataSubscriberInfo ioFileSubInfo;
93106
private volatile DataSubscriberInfo sessionIdSubInfo;
94-
private final ConcurrentHashMap<Address<String>, DataSubscriberInfo> userIdSubInfo =
107+
private volatile DataSubscriberInfo userIdSubInfo;
108+
private final ConcurrentHashMap<String, DataSubscriberInfo> loginEventSubInfo =
95109
new ConcurrentHashMap<>();
96110
private volatile DataSubscriberInfo execCmdSubInfo;
97111
private volatile DataSubscriberInfo shellCmdSubInfo;
@@ -136,13 +150,10 @@ public void init() {
136150
subscriptionService.registerCallback(EVENTS.networkConnection(), this::onNetworkConnection);
137151
subscriptionService.registerCallback(EVENTS.fileLoaded(), this::onFileLoaded);
138152
subscriptionService.registerCallback(EVENTS.requestSession(), this::onRequestSession);
139-
subscriptionService.registerCallback(EVENTS.userId(), this.onUserEvent(KnownAddresses.USER_ID));
140-
subscriptionService.registerCallback(
141-
EVENTS.loginSuccess(), this.onUserEvent(KnownAddresses.LOGIN_SUCCESS));
142-
subscriptionService.registerCallback(
143-
EVENTS.loginFailure(), this.onUserEvent(KnownAddresses.LOGIN_FAILURE));
144153
subscriptionService.registerCallback(EVENTS.execCmd(), this::onExecCmd);
145154
subscriptionService.registerCallback(EVENTS.shellCmd(), this::onShellCmd);
155+
subscriptionService.registerCallback(EVENTS.user(), this::onUser);
156+
subscriptionService.registerCallback(EVENTS.loginEvent(), this::onLoginEvent);
146157

147158
if (additionalIGEvents.contains(EVENTS.requestPathParams())) {
148159
subscriptionService.registerCallback(EVENTS.requestPathParams(), this::onRequestPathParams);
@@ -153,55 +164,157 @@ public void init() {
153164
}
154165
}
155166

156-
private TriFunction<RequestContext, UserIdCollectionMode, String, Flow<Void>> onUserEvent(
157-
final Address<String> address) {
158-
return (ctx_, mode, userId) -> {
159-
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
160-
if (userId == null || ctx == null) {
167+
private Flow<Void> onUser(
168+
final RequestContext ctx_, final UserIdCollectionMode mode, final String originalUser) {
169+
if (mode == DISABLED) {
170+
return NoopFlow.INSTANCE;
171+
}
172+
final String user = anonymizeUser(mode, originalUser);
173+
if (user == null) {
174+
return NoopFlow.INSTANCE;
175+
}
176+
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
177+
if (ctx == null) {
178+
return NoopFlow.INSTANCE;
179+
}
180+
final TraceSegment segment = ctx_.getTraceSegment();
181+
182+
// span with ASM data
183+
segment.setTagTop(Tags.ASM_KEEP, true);
184+
segment.setTagTop(Tags.PROPAGATED_APPSEC, true);
185+
186+
// skip event if we have an SDK one
187+
if (mode != SDK) {
188+
segment.setTagTop("_dd.appsec.usr.id", user);
189+
if (ctx.getUserIdSource() == SDK) {
161190
return NoopFlow.INSTANCE;
162191
}
163-
final TraceSegment segment = ctx_.getTraceSegment();
164-
// user id can be set by the SDK overriding the auto event, always update the segment
165-
segment.setTagTop("usr.id", userId);
166-
segment.setTagTop("_dd.appsec.user.collection_mode", mode.shortName());
167-
final List<Address<?>> addresses = new ArrayList<>(2);
168-
final boolean newUserId = !userId.equals(ctx.getUserId());
169-
if (newUserId) {
170-
// unlikely that multiple threads will update the value at the same time
171-
ctx.setUserId(userId);
172-
addresses.add(KnownAddresses.USER_ID);
192+
}
193+
194+
// update span tags
195+
segment.setTagTop("usr.id", user);
196+
segment.setTagTop("_dd.appsec.user.collection_mode", mode.fullName());
197+
198+
// update current context with new user id
199+
ctx.setUserIdSource(mode);
200+
final boolean newUserId = !user.equals(ctx.getUserId());
201+
if (!newUserId) {
202+
return NoopFlow.INSTANCE;
203+
}
204+
ctx.setUserId(user);
205+
206+
// call waf if we have a new user id
207+
while (true) {
208+
DataSubscriberInfo subInfo = userIdSubInfo;
209+
if (subInfo == null) {
210+
subInfo = producerService.getDataSubscribers(KnownAddresses.USER_ID);
211+
userIdSubInfo = subInfo;
173212
}
174-
if (address != KnownAddresses.USER_ID) {
175-
addresses.add(address);
213+
if (subInfo == null || subInfo.isEmpty()) {
214+
return NoopFlow.INSTANCE;
215+
}
216+
DataBundle bundle =
217+
new MapDataBundle.Builder(CAPACITY_0_2).add(KnownAddresses.USER_ID, user).build();
218+
try {
219+
GatewayContext gwCtx = new GatewayContext(false);
220+
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
221+
} catch (ExpiredSubscriberInfoException e) {
222+
userIdSubInfo = null;
223+
}
224+
}
225+
}
226+
227+
private Flow<Void> onLoginEvent(
228+
final RequestContext ctx_,
229+
final UserIdCollectionMode mode,
230+
final String eventName,
231+
final Boolean exists,
232+
final String originalUser,
233+
final Map<String, String> metadata) {
234+
if (mode == DISABLED) {
235+
return NoopFlow.INSTANCE;
236+
}
237+
final String user = anonymizeUser(mode, originalUser);
238+
if (user == null) {
239+
return NoopFlow.INSTANCE;
240+
}
241+
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
242+
if (ctx == null) {
243+
return NoopFlow.INSTANCE;
244+
}
245+
final TraceSegment segment = ctx_.getTraceSegment();
246+
247+
// span with ASM data
248+
segment.setTagTop(Tags.ASM_KEEP, true);
249+
segment.setTagTop(Tags.PROPAGATED_APPSEC, true);
250+
251+
// skip event if we have an SDK one
252+
if (mode != SDK) {
253+
segment.setTagTop("_dd.appsec.usr.login", user);
254+
segment.setTagTop("_dd.appsec.usr.id", user);
255+
segment.setTagTop(
256+
"_dd.appsec.events.users." + eventName + ".auto.mode", mode.fullName(), true);
257+
if (ctx.getUserLoginSource() == SDK) {
258+
return NoopFlow.INSTANCE;
176259
}
177-
if (addresses.isEmpty()) {
178-
// nothing to publish so short-circuit here
260+
} else {
261+
segment.setTagTop("_dd.appsec.events.users." + eventName + ".sdk", true, true);
262+
}
263+
264+
// update span tags
265+
segment.setTagTop("appsec.events.users." + eventName + ".usr.login", user, true);
266+
segment.setTagTop("appsec.events.users." + eventName + ".usr.id", user, true);
267+
segment.setTagTop("appsec.events.users." + eventName + ".track", true, true);
268+
if (exists != null) {
269+
segment.setTagTop("appsec.events.users." + eventName + ".usr.exists", exists, true);
270+
}
271+
if (metadata != null && !metadata.isEmpty()) {
272+
segment.setTagTop("appsec.events.users." + eventName, metadata, true);
273+
}
274+
275+
// update current context with new user login
276+
ctx.setUserLoginSource(mode);
277+
final boolean newUserLogin = !user.equals(ctx.getUserLogin());
278+
if (!newUserLogin) {
279+
return NoopFlow.INSTANCE;
280+
}
281+
ctx.setUserLogin(user);
282+
283+
// call waf if we have a new user login
284+
final List<Address<?>> addresses = new ArrayList<>(3);
285+
addresses.add(KnownAddresses.USER_LOGIN);
286+
addresses.add(KnownAddresses.USER_ID);
287+
if (KnownAddresses.LOGIN_SUCCESS.getKey().endsWith(eventName)) {
288+
addresses.add(KnownAddresses.LOGIN_SUCCESS);
289+
} else if (KnownAddresses.LOGIN_FAILURE.getKey().endsWith(eventName)) {
290+
addresses.add(KnownAddresses.LOGIN_FAILURE);
291+
}
292+
final MapDataBundle.Builder bundleBuilder =
293+
new MapDataBundle.Builder(addresses.size() == 2 ? CAPACITY_0_2 : CAPACITY_3_4);
294+
bundleBuilder.add(KnownAddresses.USER_ID, user);
295+
bundleBuilder.add(KnownAddresses.USER_LOGIN, user);
296+
if (addresses.size() == 3) {
297+
// we don't support null values for the address so we use an invalid placeholder here
298+
bundleBuilder.add(addresses.get(2), "invalid");
299+
}
300+
final DataBundle bundle = bundleBuilder.build();
301+
final String subInfoKey =
302+
addresses.stream().map(Address::getKey).collect(Collectors.joining("|"));
303+
while (true) {
304+
DataSubscriberInfo subInfo =
305+
loginEventSubInfo.computeIfAbsent(
306+
subInfoKey,
307+
t -> producerService.getDataSubscribers(addresses.toArray(new Address[0])));
308+
if (subInfo == null || subInfo.isEmpty()) {
179309
return NoopFlow.INSTANCE;
180310
}
181-
final Address<?>[] addressArray = addresses.toArray(new Address[0]);
182-
while (true) {
183-
DataSubscriberInfo subInfo =
184-
userIdSubInfo.computeIfAbsent(
185-
address, k -> producerService.getDataSubscribers(addressArray));
186-
if (subInfo == null || subInfo.isEmpty()) {
187-
return NoopFlow.INSTANCE;
188-
}
189-
MapDataBundle.Builder bundle = new MapDataBundle.Builder(CAPACITY_0_2);
190-
if (newUserId) {
191-
bundle.add(KnownAddresses.USER_ID, userId);
192-
}
193-
if (address != KnownAddresses.USER_ID) {
194-
// we don't support null values for the address so we use an invalid placeholder here
195-
bundle.add(address, "invalid");
196-
}
197-
try {
198-
GatewayContext gwCtx = new GatewayContext(false);
199-
return producerService.publishDataEvent(subInfo, ctx, bundle.build(), gwCtx);
200-
} catch (ExpiredSubscriberInfoException e) {
201-
userIdSubInfo.remove(address);
202-
}
311+
try {
312+
GatewayContext gwCtx = new GatewayContext(false);
313+
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
314+
} catch (ExpiredSubscriberInfoException e) {
315+
loginEventSubInfo.remove(subInfoKey);
203316
}
204-
};
317+
}
205318
}
206319

207320
private Flow<Void> onRequestSession(final RequestContext ctx_, final String sessionId) {
@@ -994,6 +1107,33 @@ private static int byteToDigit(byte b) {
9941107
return -1;
9951108
}
9961109

1110+
protected static String anonymizeUser(final UserIdCollectionMode mode, final String userId) {
1111+
if (mode != ANONYMIZATION || userId == null) {
1112+
return userId;
1113+
}
1114+
MessageDigest digest;
1115+
try {
1116+
// TODO avoid lookup a new instance every time
1117+
digest = MessageDigest.getInstance("SHA-256");
1118+
} catch (NoSuchAlgorithmException e) {
1119+
if (!SHA_MISSING_REPORTED.getAndSet(true)) {
1120+
log.error(
1121+
SEND_TELEMETRY,
1122+
"Missing SHA-256 digest, user collection in 'anon' mode cannot continue",
1123+
e);
1124+
}
1125+
return null;
1126+
}
1127+
digest.update(userId.getBytes());
1128+
byte[] hash = digest.digest();
1129+
if (hash.length > HASH_SIZE_BYTES) {
1130+
byte[] temp = new byte[HASH_SIZE_BYTES];
1131+
System.arraycopy(hash, 0, temp, 0, temp.length);
1132+
hash = temp;
1133+
}
1134+
return ANON_PREFIX + toHexString(hash);
1135+
}
1136+
9971137
private static class IGAppSecEventDependencies {
9981138

9991139
private static final Map<Address<?>, Collection<datadog.trace.api.gateway.EventType<?>>>

0 commit comments

Comments
 (0)