Skip to content

AppView Integration

AppViews index Stratos content by subscribing to the zone.stratos.sync.subscribeRecords WebSocket endpoint — similar to how AppViews subscribe to PDS firehoses, but scoped per-user.

Step 1: Service Authentication

AppViews authenticate using service auth — a signed JWT passed as the syncToken query parameter. The token must be a query param because Authorization headers are stripped by many WebSocket proxies and aren't supported in browser WebSocket APIs.

typescript
import { createServiceJwt } from '@atproto/xrpc-server'

async function mintSyncToken(
  appviewDid: string,
  stratosServiceDid: string,
  signingKey: Keypair,
): Promise<string> {
  // Mint a fresh token for every connection — tokens are short-lived.
  return createServiceJwt({
    iss: appviewDid,
    aud: stratosServiceDid,
    lxm: 'zone.stratos.sync.subscribeRecords',
    keypair: signingKey,
  })
}

Step 2: Subscribe to User Records

typescript
import WebSocket from 'ws'

async function subscribeToUser(
  appviewDid: string,
  stratosServiceDid: string,
  signingKey: Keypair,
  did: string,
  cursor?: number,
) {
  const syncToken = await mintSyncToken(
    appviewDid,
    stratosServiceDid,
    signingKey,
  )

  const url = new URL(
    'wss://stratos.example.com/xrpc/zone.stratos.sync.subscribeRecords',
  )
  url.searchParams.set('did', did)
  url.searchParams.set('syncToken', syncToken)
  if (cursor !== undefined) url.searchParams.set('cursor', cursor.toString())

  const ws = new WebSocket(url.toString())

  ws.on('message', async (data) => {
    const frame = decodeFrame(data)

    if (frame.$type === 'zone.stratos.sync.subscribeRecords#commit') {
      for (const op of frame.ops) {
        if (op.action === 'create' || op.action === 'update') {
          await indexRecord(frame.did, op.path, op.record)
        } else if (op.action === 'delete') {
          await deleteRecord(frame.did, op.path)
        }
      }
      await saveCursor(did, frame.seq)
    }
  })
}

Step 3: Index with Boundary Metadata

Store boundary domains with each record for filtering:

typescript
async function indexRecord(did: string, path: string, record: unknown) {
  const [collection, rkey] = path.split('/')
  const uri = `at://${did}/${collection}/${rkey}`
  const boundary = record.boundary?.values?.map((d) => d.value) ?? []

  await db
    .insertInto('stratos_posts')
    .values({
      uri,
      did,
      collection,
      rkey,
      text: record.text,
      boundary_domains: JSON.stringify(boundary),
      created_at: record.createdAt,
      indexed_at: new Date().toISOString(),
    })
    .onConflict((oc) =>
      oc.column('uri').doUpdateSet({
        text: record.text,
        boundary_domains: JSON.stringify(boundary),
      }),
    )
    .execute()
}

Step 4: Query with Boundary Filtering

When serving content, filter by the viewer's domain membership:

typescript
async function getAuthorFeed(authorDid: string, viewerDomains: string[]) {
  return db
    .selectFrom('stratos_posts')
    .where('did', '=', authorDid)
    .where((eb) =>
      eb.or([
        eb('did', '=', viewerDid),
        ...viewerDomains.map((domain) =>
          eb.raw('boundary_domains LIKE ?', [`%"${domain}"%`]),
        ),
      ]),
    )
    .orderBy('created_at', 'desc')
    .limit(50)
    .execute()
}

Step 5: Determine Viewer's Domains

The AppView needs to know what domains a viewer belongs to.

Option A — Index from Stratos posts:

typescript
async function getViewerDomains(viewerDid: string): Promise<string[]> {
  const result = await db
    .selectFrom('stratos_posts')
    .select('boundary_domains')
    .where('did', '=', viewerDid)
    .execute()

  const domains = new Set<string>()
  for (const row of result) {
    for (const domain of JSON.parse(row.boundary_domains)) {
      domains.add(domain)
    }
  }
  return [...domains]
}

Option B — Community registry lookup: Query a community service for verified domain membership.

Complete Indexer Example

typescript
class StratosIndexer {
  private cursors = new Map<string, number>()

  constructor(
    private db: Kysely<AppViewDb>,
    private stratosEndpoint: string,
    private appviewDid: string,
    private stratosServiceDid: string,
    private signingKey: Keypair,
  ) {}

  async startIndexing(enrolledDids: string[]) {
    for (const did of enrolledDids) {
      const cursor = await this.loadCursor(did)
      this.subscribeToUser(did, cursor)
    }
  }

  private async subscribeToUser(did: string, cursor?: number) {
    // Mint a fresh JWT on every connection.
    const syncToken = await createServiceJwt({
      iss: this.appviewDid,
      aud: this.stratosServiceDid,
      lxm: 'zone.stratos.sync.subscribeRecords',
      keypair: this.signingKey,
    })

    const url = new URL(
      `${this.stratosEndpoint}/xrpc/zone.stratos.sync.subscribeRecords`,
    )
    url.searchParams.set('did', did)
    url.searchParams.set('syncToken', syncToken)
    if (cursor !== undefined) url.searchParams.set('cursor', cursor.toString())

    const ws = new WebSocket(url.toString())

    ws.on('message', async (data) => {
      const event = this.decodeEvent(data)

      if (event.$type === 'zone.stratos.sync.subscribeRecords#info') {
        if (event.name === 'OutdatedCursor') {
          this.cursors.delete(did)
        }
        return
      }

      if (event.$type === 'zone.stratos.sync.subscribeRecords#commit') {
        await this.db.transaction().execute(async (tx) => {
          for (const op of event.ops) {
            await this.processOp(tx, event.did, op)
          }
          await this.saveCursor(tx, event.did, event.seq)
        })
        this.cursors.set(did, event.seq)
      }
    })

    ws.on('close', () => {
      // Reconnect with a fresh token after a short delay.
      setTimeout(() => this.subscribeToUser(did, this.cursors.get(did)), 5000)
    })
  }
}