Skip to content

Commit 470cce4

Browse files
Adam GoughAdam Gough
authored andcommitted
added ordering functionality back into index #970
1 parent 02d254c commit 470cce4

File tree

2 files changed

+76
-5
lines changed

2 files changed

+76
-5
lines changed

apps/sim/app/api/chat/utils.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -747,14 +747,13 @@ export async function executeWorkflowForChat(
747747
log.output.content = separator + formattedOutput
748748
processedOutputs.add(log.blockId)
749749

750-
// Stream non-streaming workflow block outputs as chunks so the client sees them
751-
// as a separate section, matching multi-agent behavior. Function synthetic streaming is
752-
// handled inside the executor; avoid duplicating here.
750+
// Stream non-streaming selected block outputs as chunks so the client sees them
751+
// as a separate section, matching multi-agent behavior.
753752
const isSelected = selectedOutputIds.some((id) => {
754753
const idBlock = id.includes('_') ? id.split('_')[0] : id.split('.')[0]
755754
return idBlock === blockIdForOutput
756755
})
757-
if ((log as any).blockType === 'workflow' && isSelected) {
756+
if (isSelected) {
758757
try {
759758
// Only emit separator into the stream if at least one block has already streamed
760759
if (streamedBlocks.size > 0) {
@@ -788,7 +787,7 @@ export async function executeWorkflowForChat(
788787
)
789788
} catch (e) {
790789
logger.warn(
791-
`[${requestId}] Failed to stream non-streaming output for function block ${blockIdForOutput}`,
790+
`[${requestId}] Failed to stream non-streaming output for selected block ${blockIdForOutput}`,
792791
e
793792
)
794793
}

apps/sim/executor/index.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,78 @@ export class Executor {
16491649

16501650
context.blockLogs.push(blockLog)
16511651

1652+
try {
1653+
const blockType = block.metadata?.id
1654+
const isStreamingEnabled = Boolean(context.stream)
1655+
const hasOnStream = typeof context.onStream === 'function'
1656+
const isSelected = (context.selectedOutputIds || []).some((outputId) => {
1657+
if (outputId === block.id) return true
1658+
const underscoreIdx = outputId.indexOf('_')
1659+
const dotIdx = outputId.indexOf('.')
1660+
if (underscoreIdx !== -1) return outputId.substring(0, underscoreIdx) === block.id
1661+
if (dotIdx !== -1) return outputId.substring(0, dotIdx) === block.id
1662+
return false
1663+
})
1664+
1665+
// Exclude infra/router/agent (agents already stream from handlers)
1666+
const excludedTypes = new Set([
1667+
BlockType.LOOP,
1668+
BlockType.PARALLEL,
1669+
BlockType.CONDITION,
1670+
BlockType.ROUTER,
1671+
BlockType.STARTER,
1672+
BlockType.WEBHOOK_TRIGGER,
1673+
BlockType.SCHEDULE,
1674+
BlockType.AGENT,
1675+
])
1676+
const isEligible = !!blockType && !excludedTypes.has(blockType as any)
1677+
1678+
if (isEligible && isStreamingEnabled && hasOnStream && isSelected) {
1679+
const out = output as any
1680+
let formatted: string
1681+
if (typeof out?.result !== 'undefined') {
1682+
formatted =
1683+
typeof out.result === 'string' ? out.result : JSON.stringify(out.result, null, 2)
1684+
} else if (typeof out?.content === 'string') {
1685+
formatted = out.content
1686+
} else if (typeof out?.stdout === 'string' && out.stdout) {
1687+
formatted = out.stdout
1688+
} else if (typeof out?.data !== 'undefined') {
1689+
formatted = typeof out.data === 'string' ? out.data : JSON.stringify(out.data, null, 2)
1690+
} else if (typeof out?.response?.data !== 'undefined') {
1691+
const d = out.response.data
1692+
formatted = typeof d === 'string' ? d : JSON.stringify(d, null, 2)
1693+
} else {
1694+
formatted = typeof out === 'string' ? out : JSON.stringify(out ?? {}, null, 2)
1695+
}
1696+
1697+
const syntheticStream = new ReadableStream({
1698+
start(controller) {
1699+
const enc = new TextEncoder()
1700+
controller.enqueue(enc.encode(formatted))
1701+
controller.close()
1702+
},
1703+
})
1704+
1705+
const se: StreamingExecution = {
1706+
stream: syntheticStream,
1707+
execution: {
1708+
success: true,
1709+
output: { content: formatted },
1710+
logs: [],
1711+
metadata: { duration: 0, startTime: new Date().toISOString() },
1712+
blockId: block.id,
1713+
blockType: blockType,
1714+
isStreaming: true,
1715+
},
1716+
}
1717+
1718+
await context.onStream?.(se)
1719+
}
1720+
} catch (emitError) {
1721+
logger.warn('Failed to emit synthetic streaming for non-streaming block:', emitError)
1722+
}
1723+
16521724
// Skip console logging for infrastructure blocks like loops and parallels
16531725
if (block.metadata?.id !== BlockType.LOOP && block.metadata?.id !== BlockType.PARALLEL) {
16541726
// Determine iteration context for this block

0 commit comments

Comments
 (0)