diff --git a/backend/src/database/migrations/U1733322265__anonymizeMemberIdentities.sql b/backend/src/database/migrations/U1733322265__anonymizeMemberIdentities.sql new file mode 100644 index 0000000000..4eb9d325df --- /dev/null +++ b/backend/src/database/migrations/U1733322265__anonymizeMemberIdentities.sql @@ -0,0 +1,3 @@ +alter table "requestedForErasureMemberIdentities" drop constraint "unique_anonymized_member"; + +alter table "requestedForErasureMemberIdentities" add constraint "unique_anonymized_member" unique ("memberId", "platform", "type", "value"); \ No newline at end of file diff --git a/backend/src/database/migrations/V1733322265__anonymizeMemberIdentities.sql b/backend/src/database/migrations/V1733322265__anonymizeMemberIdentities.sql new file mode 100644 index 0000000000..5c79292043 --- /dev/null +++ b/backend/src/database/migrations/V1733322265__anonymizeMemberIdentities.sql @@ -0,0 +1 @@ +alter table "requestedForErasureMemberIdentities" add column "memberId" uuid; \ No newline at end of file diff --git a/services/apps/data_sink_worker/src/bin/anonymize-member.ts b/services/apps/data_sink_worker/src/bin/anonymize-member.ts new file mode 100644 index 0000000000..80cef6044e --- /dev/null +++ b/services/apps/data_sink_worker/src/bin/anonymize-member.ts @@ -0,0 +1,318 @@ +import fs from 'fs' +import path from 'path' + +import { + PriorityLevelContextRepository, + QueuePriorityContextLoader, + SearchSyncWorkerEmitter, +} from '@crowd/common_services' +import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' +import { anonymizeUsername } from '@crowd/data-access-layer/src/gdpr' +import { getServiceChildLogger } from '@crowd/logging' +import { QueueFactory } from '@crowd/queue' +import { getRedisClient } from '@crowd/redis' +import { IMemberIdentity, MemberIdentityType } from '@crowd/types' + +import { DB_CONFIG, QUEUE_CONFIG, REDIS_CONFIG } from '../conf' + +const log = getServiceChildLogger('anonymize-member') + +const processArguments = process.argv.slice(2) + +if (processArguments.length === 0 || processArguments.length % 2 !== 0) { + log.error( + ` + Expected argument in pairs which can be any of the following: + - ids ", , ..." + - email john@doe.com + - name "John Doe" + - (e.g. lfid someusername) + `, + ) + process.exit(1) +} + +setImmediate(async () => { + const manualCheckFile = `manual_check_member_ids.txt` + const dbConnection = await getDbConnection(DB_CONFIG()) + const store = new DbStore(log, dbConnection) + const queueClient = QueueFactory.createQueueService(QUEUE_CONFIG()) + const redisClient = await getRedisClient(REDIS_CONFIG()) + const priorityLevelRepo = new PriorityLevelContextRepository(new DbStore(log, dbConnection), log) + const loader: QueuePriorityContextLoader = (tenantId: string) => + priorityLevelRepo.loadPriorityLevelContext(tenantId) + + const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(queueClient, redisClient, loader, log) + await searchSyncWorkerEmitter.init() + + const pairs = [] + for (let i = 0; i < processArguments.length; i += 2) { + pairs.push({ + type: processArguments[i], + value: processArguments[i + 1], + }) + } + + log.info( + `Anonymizing member based on input data: [${pairs + .map((p) => `${p.type} "${p.value}"`) + .join(', ')}]`, + ) + + const idParams = pairs.filter((p) => p.type === 'ids') + const idsToAnonymize: string[] = [] + for (const param of idParams) { + idsToAnonymize.push(...param.value.split(',').map((id) => id.trim())) + } + + const memberDataMap: Map = new Map() + + if (idsToAnonymize.length > 0) { + for (const memberId of idsToAnonymize) { + try { + await store.transactionally(async (t) => { + let memberData: any + if (memberDataMap.has(memberId)) { + memberData = memberDataMap.get(memberId) + } else { + memberData = await store + .connection() + .one(`select * from members where id = $(memberId)`, { + memberId, + }) + memberDataMap.set(memberId, memberData) + } + + // Get all identities for the member + const identities = await store + .connection() + .any(`select * from "memberIdentities" where "memberId" = $(memberId)`, { memberId }) + + log.info({ tenantId: memberData.tenantId }, 'ANONYMIZING MEMBER DATA...') + + // Anonymize each identity and update the database + for (const identity of identities) { + const hashedUsername = anonymizeUsername( + identity.value, + identity.platform, + identity.type, + ) + + await anonymizeMemberInDb(store, identity, hashedUsername) + } + + await searchSyncWorkerEmitter.triggerMemberSync(memberData.tenantId, memberId, true) + }) + } catch (err) { + log.error(err, { memberId }, 'Failed to anonymize member!') + } + } + } else { + const nameIdentity = pairs.find((p) => p.type === 'name') + const otherIdentities = pairs.filter((p) => p.type !== 'name') + + if (otherIdentities.length > 0) { + const conditions: string[] = [] + const params: any = {} + let index = 0 + for (const pair of otherIdentities) { + params[`value_${index}`] = pair.value + if (pair.type === 'email') { + conditions.push( + `(type = '${MemberIdentityType.EMAIL}' and lower(value) = lower($(value_${index})))`, + ) + } else { + params[`platform_${index}`] = (pair.type as string).toLowerCase() + conditions.push( + `(platform = $(platform_${index}) and lower(value) = lower($(value_${index})))`, + ) + } + + index++ + } + + const query = `select * from "memberIdentities" where ${conditions.join(' or ')}` + const existingIdentities = await store.connection().any(query, params) + + if (existingIdentities.length > 0) { + log.info(`Found ${existingIdentities.length} existing identities to anonymize.`) + + for (const identity of existingIdentities) { + try { + await store.transactionally(async (t) => { + const hashedUsername = anonymizeUsername( + identity.value, + identity.platform, + identity.type, + ) + + // Update memberIdentities table + await store.connection().none( + `update "memberIdentities" + set value = $(hashedValue) + where "memberId" = $(memberId) + and platform = $(platform) + and type = $(type)`, + { + hashedValue: hashedUsername, + memberId: identity.memberId, + platform: identity.platform, + type: identity.type, + }, + ) + + // Add to requestedForErasureMemberIdentities + await store.connection().none( + `insert into "requestedForErasureMemberIdentities" + (id, platform, type, value, "memberId") + values ($(id), $(platform), $(type), $(value), $(memberId)) + on conflict do nothing`, + { + memberId: identity.memberId, + platform: identity.platform, + type: identity.type, + value: hashedUsername, + }, + ) + + // Update activities + await store.connection().none( + `update activities + set username = $(hashedValue) + where "memberId" = $(memberId)`, + { + hashedValue: hashedUsername, + memberId: identity.memberId, + }, + ) + + await store.connection().none( + `update activities + set "objectMemberUsername" = $(hashedValue) + where "objectMemberId" = $(memberId)`, + { + hashedValue: hashedUsername, + memberId: identity.memberId, + }, + ) + + await searchSyncWorkerEmitter.triggerMemberSync( + identity.tenantId, + identity.memberId, + true, + ) + }) + } catch (err) { + log.error(err, { identity }, 'Failed to anonymize member identity!') + } + } + } + } + + if (nameIdentity) { + const results = await store + .connection() + .any(`select id from members where lower("displayName") = lower($(name))`, { + name: nameIdentity.value.trim(), + }) + + if (results.length > 0) { + addLinesToFile(manualCheckFile, [ + `name: ${nameIdentity.value}, member ids: [${results.map((r) => r.id).join(', ')}]`, + ]) + log.warn( + `Found ${results.length} members with name: ${ + nameIdentity.value + }! Manual check required for member ids: [${results.map((r) => r.id).join(', ')}]!`, + ) + } + } + } + + process.exit(0) +}) + +function addLinesToFile(filePath: string, lines: string[]) { + try { + fs.mkdirSync(path.dirname(filePath), { recursive: true }) + try { + fs.accessSync(filePath) + fs.appendFileSync(filePath, lines.join('\n') + '\n') + } catch (error) { + fs.writeFileSync(filePath, lines.join('\n') + '\n') + } + } catch (err) { + log.error(err, { filePath }, 'Error while writing to file!') + throw err + } +} + +async function anonymizeMemberInDb( + store: DbStore, + identity: IMemberIdentity, + hashedUsername: string, +) { + // Update member details + // todo: cleanup original member data in members table + await store.connection().none( + `update members + set "displayName" = $(hashedValue) + where id = $(memberId)`, + { + hashedValue: hashedUsername, + memberId: identity.memberId, + }, + ) + + // Update memberIdentities table + await store.connection().none( + `update "memberIdentities" + set value = $(hashedValue) + where "memberId" = $(memberId) + and platform = $(platform) + and type = $(type)`, + { + hashedValue: hashedUsername, + memberId: identity.memberId, + platform: identity.platform, + type: identity.type, + }, + ) + + // Add to requestedForErasureMemberIdentities + await store.connection().none( + `insert into "requestedForErasureMemberIdentities" + (id, platform, type, value, "memberId") + values ($(id), $(platform), $(type), $(value), $(memberId)) + on conflict do nothing`, + { + id: identity.memberId, + platform: identity.platform, + type: identity.type, + value: hashedUsername, + memberId: identity.memberId, + }, + ) + + // Update activities table + await store.connection().none( + `update activities + set "objectMemberUsername" = $(hashedValue) + where "objectMemberId" = $(memberId)`, + { + hashedValue: hashedUsername, + memberId: identity.memberId, + }, + ) + + // Update activities table for member activities + await store.connection().none( + `update activities + set username = $(hashedValue) + where "memberId" = $(memberId)`, + { + hashedValue: hashedUsername, + memberId: identity.memberId, + }, + ) +} diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 9a45551033..2272b836bb 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -443,10 +443,40 @@ export default class ActivityService extends LoggerBase { throw new Error('Activity does not have a username or member.') } + let member = activity.member + + const erasureRepo = new RequestedForErasureMemberIdentitiesRepository(this.pgStore, this.log) + + const anonymizedMember = await erasureRepo.anonymizeMemberIfRequested(activity.member) + + if (anonymizedMember) { + member = anonymizedMember + + this.log.warn( + { memberIdentities: member.identities }, + 'Member has identities that were requested to be erased by the user, so anonymized instead!', + ) + } + let username = activity.username + + // check if activity.username was requested to be erased + const anonymizeUsernameIfRequested = await erasureRepo.getAnonymizationRequest({ + value: activity.username, + platform, + type: MemberIdentityType.USERNAME, + hashValue: true, + }) + + if (anonymizeUsernameIfRequested) { + username = anonymizeUsernameIfRequested.value + // explicitly set the displayName to the anonymized username + member.displayName = username + } + if (!username) { const identity = singleOrDefault( - activity.member.identities, + member.identities, (i) => i.platform === platform && i.type === MemberIdentityType.USERNAME, ) if (!identity) { @@ -467,7 +497,6 @@ export default class ActivityService extends LoggerBase { username = identity.value } - let member = activity.member if (!member) { member = { identities: [ @@ -485,9 +514,35 @@ export default class ActivityService extends LoggerBase { member.attributes = {} } - let objectMemberUsername = activity.objectMemberUsername let objectMember = activity.objectMember + const anonymizeObjectMemberIfRequested = + await erasureRepo.anonymizeMemberIfRequested(objectMember) + + if (anonymizeObjectMemberIfRequested) { + objectMember = anonymizeObjectMemberIfRequested + + this.log.warn( + { objectMemberIdentities: objectMember.identities }, + 'Object member has identities that were requested to be erased by the user, so anonymized instead!', + ) + } + + let objectMemberUsername = activity.objectMemberUsername + + const anonymizeObjectMemberUsernameIfRequested = await erasureRepo.getAnonymizationRequest({ + value: objectMemberUsername, + platform, + type: MemberIdentityType.USERNAME, + hashValue: true, + }) + + if (anonymizeObjectMemberUsernameIfRequested) { + objectMemberUsername = anonymizeObjectMemberUsernameIfRequested.value + // explicitly set the displayName to the anonymized username + objectMember.displayName = objectMemberUsername + } + if (objectMember && !objectMemberUsername) { const identity = singleOrDefault( objectMember.identities, @@ -514,75 +569,6 @@ export default class ActivityService extends LoggerBase { } } - const repo = new RequestedForErasureMemberIdentitiesRepository(this.pgStore, this.log) - - // check if member or object member have identities that were requested to be erased by the user - if (member && member.identities.length > 0) { - const toErase = await repo.someIdentitiesWereErasedByUserRequest(member.identities) - if (toErase.length > 0) { - // prevent member/activity creation of one of the identities that are marked to be erased are verified - if (toErase.some((i) => i.verified)) { - this.log.warn( - { memberIdentities: member.identities }, - 'Member has identities that were requested to be erased by the user! Skipping activity processing!', - ) - return - } else { - // we just remove the unverified identities that were marked to be erased and prevent them from being created - member.identities = member.identities.filter((i) => { - if (i.verified) return true - - const maybeToErase = toErase.find( - (e) => e.type === i.type && e.value === i.value && e.platform === i.platform, - ) - - if (maybeToErase) return false - return true - }) - - if (member.identities.filter((i) => i.value).length === 0) { - this.log.warn( - 'Member had at least one unverified identity removed as it was requested to be removed! Now there is no identities left - skipping processing!', - ) - return - } - } - } - } - - if (objectMember && objectMember.identities.length > 0) { - const toErase = await repo.someIdentitiesWereErasedByUserRequest(objectMember.identities) - if (toErase.length > 0) { - // prevent member/activity creation of one of the identities that are marked to be erased are verified - if (toErase.some((i) => i.verified)) { - this.log.warn( - { objectMemberIdentities: objectMember.identities }, - 'Object member has identities that were requested to be erased by the user! Skipping activity processing!', - ) - return - } else { - // we just remove the unverified identities that were marked to be erased and prevent them from being created - objectMember.identities = objectMember.identities.filter((i) => { - if (i.verified) return true - - const maybeToErase = toErase.find( - (e) => e.type === i.type && e.value === i.value && e.platform === i.platform, - ) - - if (maybeToErase) return false - return true - }) - - if (objectMember.identities.filter((i) => i.value).length === 0) { - this.log.warn( - 'Object member had at least one unverified identity removed as it was requested to be removed! Now there is no identities left - skipping processing!', - ) - return - } - } - } - } - let memberId: string let objectMemberId: string | undefined let memberIsBot = false @@ -1021,6 +1007,13 @@ export default class ActivityService extends LoggerBase { platform, false, ) + + // If this was an anonymized member, update the erasure table with the new memberId + if (anonymizedMember) { + for (const identity of member.identities) { + await erasureRepo.updateErasureRequestMemberId(identity, memberId) + } + } } // determine isBot and isTeamMember memberIsBot = (memberAttValue(MemberAttributeName.IS_BOT) as boolean) ?? false @@ -1079,11 +1072,18 @@ export default class ActivityService extends LoggerBase { : new Date(activity.timestamp), identities: objectMember.identities, organizations: objectMember.organizations, - reach: member.reach, + reach: objectMember.reach, }, platform, false, ) + + // If this was an anonymized object member, update the erasure table with the new memberId + if (anonymizeObjectMemberIfRequested) { + for (const identity of objectMember.identities) { + await erasureRepo.updateErasureRequestMemberId(identity, objectMemberId) + } + } } } } diff --git a/services/libs/data-access-layer/src/gdpr/index.ts b/services/libs/data-access-layer/src/gdpr/index.ts new file mode 100644 index 0000000000..d8a676e7b7 --- /dev/null +++ b/services/libs/data-access-layer/src/gdpr/index.ts @@ -0,0 +1,22 @@ +import crypto from 'crypto' + +/** + * Creates a deterministic anonymized username from the original username, platform and type + * @param username The original username to anonymize + * @param platform The platform the username belongs to + * @param type The type of identity (e.g. 'username', 'email') + * @returns A consistently hashed anonymous identity in the format 'anon__' + */ +export function anonymizeUsername(username: string, platform: string, type: string): string { + // hash using SHA-256 for strong one-way hashing + const hash = crypto + .createHash('sha256') + // all inputs to lowercase and combine them for hashing + .update(`${username.toLowerCase()}-${platform.toLowerCase()}-${type.toLowerCase()}`) + // Get first 8 characters of the hex hash for brevity + .digest('hex') + .slice(0, 8) + + // anonymized username in a consistent format + return `anon_user_${hash}` +} diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts index 2fd9f1508c..1def2d87bc 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts @@ -1,6 +1,9 @@ +import { generateUUIDv1 } from '@crowd/common' import { DbStore, RepositoryBase } from '@crowd/database' import { Logger } from '@crowd/logging' -import { IMemberIdentity, MemberIdentityType } from '@crowd/types' +import { IMemberData, IMemberIdentity, MemberIdentityType } from '@crowd/types' + +import { anonymizeUsername } from '../../../../gdpr' export default class RequestedForErasureMemberIdentitiesRepository extends RepositoryBase { constructor(store: DbStore, parentLog: Logger) { @@ -54,4 +57,133 @@ export default class RequestedForErasureMemberIdentitiesRepository extends Repos return await this.db().oneOrNone(query, params) } + + private async insertIdentityForErasureRequest({ + platform, + type, + value, + memberId, + }): Promise { + const query = ` + insert into "requestedForErasureMemberIdentities" (id, platform, type, value, memberId) + values ($(id), $(platform), $(type), $(value), $(memberId)) + ` + + return await this.db().none(query, { + id: generateUUIDv1(), + platform, + type, + value, + memberId, + }) + } + + public async updateErasureRequestMemberId( + identity: IMemberIdentity, + memberId: string, + ): Promise { + const query = ` + update "requestedForErasureMemberIdentities" + set "memberId" = $(memberId) + where value = $(value) + ` + + await this.db().none(query, { + memberId, + value: identity.value, + }) + } + + public async getAnonymizationRequest({ + value, + platform, + type, + hashValue = false, + }): Promise<{ id: string; memberId: string; value: string } | null> { + // hash the value only if flag is set + if (hashValue) { + value = anonymizeUsername(value, platform, type) + } + + const result = await this.db().oneOrNone( + ` + select r.id, r."memberId" + from "requestedForErasureMemberIdentities" r + where r.value = $(value) and r."platform" = $(platform) and r."type" = $(type) + limit 1 + `, + { + value, + platform, + type, + }, + ) + + if (!result) { + return null + } + + return { + ...result, + value, + } + } + + public async anonymizeMemberIfRequested(member: IMemberData): Promise { + const identitiesRequestedForAnonymization = [] + const otherIdentitiesYetToBeAnonymized = [] + let existingAnonymousMemberId: string | null = null + + const hashedIdentities = member.identities.map((identity) => ({ + ...identity, + value: anonymizeUsername(identity.value, identity.platform, identity.type), + })) + + // check all identities for anonymization request + for (const identity of hashedIdentities) { + const anonReq = await this.getAnonymizationRequest({ + value: identity.value, + platform: identity.platform, + type: identity.type, + }) + + if (anonReq) { + // identity is marked for anonymization + identitiesRequestedForAnonymization.push(anonReq) + + if (anonReq.memberId) { + existingAnonymousMemberId = anonReq.memberId + } + } else { + otherIdentitiesYetToBeAnonymized.push(identity) + } + } + + // no identities are marked for anonymization + if (identitiesRequestedForAnonymization.length === 0) { + return null + } + + // when a member is blacklisted, we need to ensure all their identities are also blacklisted + // eg: If github:johndoe is blacklisted, also blacklist linkedin:john-doe and email:john@doe.com. + await Promise.all( + otherIdentitiesYetToBeAnonymized.map((identity) => + this.insertIdentityForErasureRequest({ + platform: identity.platform, + type: identity.type, + value: identity.value, + // we use the existing anonymous memberId if available to link identities to the same member + // This happens when a blacklisted identity had no member in db at the time of blacklisting + // If no memberId exists, we set it to null and handle it during processActivity member creation + memberId: existingAnonymousMemberId, + }), + ), + ) + + return { + ...member, + identities: hashedIdentities, + attributes: {}, + } + } }