这里测试的是新增变量聚合节点 ,将原来的功能复制一下,本文只是为了学习如何新增新的节点。
api/core/workflow/nodes
这里是节点代码的存放位置
variable_aggregator_aaa
entities.py
,用存放实体类#!/usr/bin/python3
# @Author: gitsilence
# @Time: 2025/3/6 21:35
from typing import Literal, Optional
from pydantic import BaseModel
from core.workflow.nodes.base import BaseNodeData
class AdvancedSettingsAAA(BaseModel):
"""
Advanced settings aaa
"""
group_enabled: bool
class Group(BaseModel):
"""
Group
"""
# output_type的值必须是下面字符串其中之一,可以看作字符串枚举
output_type: Literal["string", "number", "object", "array[string]", "array[number]", "array[object]"]
variables: list[list[str]]
group_name: str
class VariableAssignerNodeDataAAA(BaseNodeData):
"""
Knowledge retrieval Node Data
"""
type: str = "variable-assigner-aaa"
output_type: str
variables: list[list[str]]
advanced_settings: Optional[AdvancedSettingsAAA] = None
pass
这里定义组件使用的参数实体,字段的值类型、默认值
variable_aggregator_node_aaa.py
#!/usr/bin/python3
# @Author: gitsilence
# @Time: 2025/3/6 21:33
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.enums import NodeType
from core.workflow.nodes.variable_aggregator_aaa.entities import VariableAssignerNodeDataAAA
from models.workflow import WorkflowNodeExecutionStatus
class VariableAggregatorNodeAAA(BaseNode[VariableAssignerNodeDataAAA]):
_node_data_cls = VariableAssignerNodeDataAAA
_node_type = NodeType.VARIABLE_AGGREGATOR_AAA
def _run(self) -> NodeRunResult:
outputs = {}
inputs = {}
if not self.node_data.advanced_settings or not self.node_data.advanced_settings.group_enabled:
for selector in self.node_data.variables:
variable = self.graph_runtime_state.variable_pool.get(selector)
if variable is not None:
outputs = {"output": variable.to_object()}
inputs = {".".join(selector[1:]): variable.to_object()}
break
else:
for group in self.node_data.advanced_settings.groups:
for selector in self.node_data.variables:
variable = self.graph_runtime_state.variable_pool.get(selector)
if variable is not None:
outputs[group.group_name] = {"output": variable.to_object()}
inputs[".".join(selector[1:])] = variable.to_object()
break
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs=outputs, inputs=inputs)
重写父类的属性,重新_run方法,业务逻辑写在此处。
__init__.py
文件from .variable_aggregator_node_aaa import VariableAggregatorNodeAAA
__all__ = ["VariableAggregatorNodeAAA"]
方便导入包的直接导入
这里的 __all__
是一个模块级别的特殊变量,它是一个字符串列表,当你使用 from module import *
时,模块中的哪些内容会被导入。如果模块定义了 __all__
,Python只会导入这个列表的名称,而忽略模块中其他的对象、函数或类。
api/core/workflow/nodes/enums.py
在 VARIABLE_AGGREGATOR = "variable-aggregator"
后面添加:VARIABLE_AGGREGATOR_AAA = "variable-aggregator-aaa"
api/core/workflow/nodes/node_mapping.py
NodeType.VARIABLE_AGGREGATOR_AAA: {
LATEST_VERSION: VariableAggregatorNodeAAA,
"1": VariableAggregatorNodeAAA,
}
至此后端代码修改完毕
修改 web/app/components/workflow/types.ts
新增枚举类中的属性 VariableAggregatorAAA = 'variable-aggregator-aaa',
修改 web/app/components/workflow/constants.ts
export const NODES_EXTRA_DATA: Record<BlockEnum, NodesExtraData> = {
...
[BlockEnum.VariableAggregatorAAA]: {
author: 'gitsilence',
about: '测试-学习',
availablePrevNodes: [],
availableNextNodes: [],
getAvailablePrevNodes: VariableAssignerDefault.getAvailablePrevNodes,
getAvailableNextNodes: VariableAssignerDefault.getAvailableNextNodes,
checkValid: VariableAssignerDefault.checkValid,
}
...
}
export const NODES_INITIAL_DATA = {
...
[BlockEnum.VariableAggregatorAAA]: {
type: BlockEnum.VariableAggregatorAAA,
title: '',
desc: '',
variables: [],
output_type: '',
...VariableAssignerDefault.defaultValue,
}
...
}
export const SUPPORT_OUTPUT_VARS_NODE = [
...
BlockEnum.VariableAggregatorAAA, BlockEnum.QuestionClassifier,
...
]
修改 web/app/components/workflow/block-icon.ts
const getIcon = (type: BlockEnum, className: string) => {
...
[BlockEnum.VariableAggregatorAAA]: <VariableX className={className} />
...
}
const ICON_CONTAINER_BG_COLOR_MAP: Record<string, string> = {
...
[BlockEnum.VariableAggregatorAAA]: 'bg-util-colors-blue-blue-500',
...
}
web/app/components/workflow/block-selector/constants.tsx
export const BLOCKS: Block[] = [
...
{
classification: BlockClassificationEnum.Transform,
type: BlockEnum.VariableAggregatorAAA,
title: 'Variable Aggregator aaa',
},
...
]
web/app/components/workflow/nodes/_base/components/variable/utils.ts
const formatItem = (
item: any,
isChatMode: boolean,
filterVar: (payload: Var, selector: ValueSelector) => boolean,
): NodeOutPutVar => {
...
// 这里新增一个case代码块
case BlockEnum.VariableAggregatorAAA: {
const {
output_type,
advanced_settings,
} = data as VariableAssignerNodeType
const isGroup = !!advanced_settings?.group_enabled
if (!isGroup) {
res.vars = [
{
variable: 'output',
type: output_type,
},
]
}
else {
res.vars = advanced_settings?.groups.map((group) => {
return {
variable: group.group_name,
type: VarType.object,
children: [{
variable: 'output',
type: group.output_type,
}],
}
})
}
break
}
...
}
export const getNodeUsedVars = (node: Node): ValueSelector[] => {
...
case BlockEnum.VariableAggregatorAAA: {
res = (data as VariableAssignerNodeType)?.variables
break
}
...
}
export const getNodeUsedVarPassToServerKey = (node: Node, valueSelector: ValueSelector): string | string[] => {
case BlockEnum.VariableAggregatorAAA: {
res = `#${valueSelector.join('.')}#`
break
}
}
export const updateNodeVars = (oldNode: Node, oldVarSelector: ValueSelector, newVarSelector: ValueSelector): Node => {
...
case BlockEnum.VariableAggregatorAAA: {
const payload = data as VariableAssignerNodeType
if (payload.variables) {
payload.variables = payload.variables.map((v) => {
if (v.join('.') === oldVarSelector.join('.'))
v = newVarSelector
return v
})
}
break
}
...
}
export const getNodeOutputVars = (node: Node, isChatMode: boolean): ValueSelector[] => {
...
case BlockEnum.VariableAggregatorAAA: {
res.push([id, 'output'])
break
}
...
}
web/app/components/workflow/nodes/_base/hooks/use-node-help-link.ts
const linkMap = useMemo(() => {
if (language === 'zh_Hans') {
return {
...
[BlockEnum.VariableAggregatorAAA]: 'variable-aggregator-aaa',
...
}
}
return {
...
[BlockEnum.VariableAggregatorAAA]: 'variable-aggregator-aaa',
...
}
}, [language])
React的userMemo(func, [])钩子,用于性能优化,用于缓存数据结果,避免不必要的重复计算。它接收两个参数:计算函数和依赖数组,只有在依赖项发生变化时才会重新计算。
核心特性:
web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts
const checkValidFns: Record<BlockEnum, Function> = {
...
[BlockEnum.VariableAggregatorAAA]: checkVariableAssignerValid,
...
}
web/app/components/workflow/hooks/use-nodes-interactions.ts
const newNodes = produce(nodes, (draft) => {
draft.forEach((n) => {
if (n.id === node.id && fromType === 'source' && (node.data.type === BlockEnum.VariableAssigner || node.data.type === BlockEnum.VariableAggregator || node.data.type === BlockEnum.VariableAggregatorAAA)) {
if (!node.data.advanced_settings?.group_enabled)
n.data._isEntering = true
}
if (n.id === node.id && fromType === 'target' && (connectingNode.data.type === BlockEnum.VariableAssigner || connectingNode.data.type === BlockEnum.VariableAggregator || connectingNode.data.type === BlockEnum.VariableAggregatorAAA) && node.data.type !== BlockEnum.IfElse && node.data.type !== BlockEnum.QuestionClassifier)
n.data._isEntering = true
})
})
const handleNodeConnectStart = useCallback<OnConnectStart>((_, { nodeId, handleType, handleId }) => {
if (getNodesReadOnly())
return
if (nodeId && handleType) {
const { setConnectingNodePayload } = workflowStore.getState()
const { getNodes } = store.getState()
const node = getNodes().find(n => n.id === nodeId)!
if (node.type === CUSTOM_NOTE_NODE)
return
if (node.data.type === BlockEnum.VariableAggregator || node.data.type === BlockEnum.VariableAggregatorAAA || node.data.type === BlockEnum.VariableAssigner) {
if (handleType === 'target')
return
}
setConnectingNodePayload({
nodeId,
nodeType: node.data.type,
handleType,
handleId,
})
}
}, [store, workflowStore, getNodesReadOnly])
const handleNodeConnectEnd = useCallback<OnConnectEnd>((e: any) => {
if (getNodesReadOnly())
return
const {
connectingNodePayload,
setConnectingNodePayload,
enteringNodePayload,
setEnteringNodePayload,
} = workflowStore.getState()
if (connectingNodePayload && enteringNodePayload) {
const {
setShowAssignVariablePopup,
hoveringAssignVariableGroupId,
} = workflowStore.getState()
const { screenToFlowPosition } = reactflow
const {
getNodes,
setNodes,
} = store.getState()
const nodes = getNodes()
const fromHandleType = connectingNodePayload.handleType
const fromHandleId = connectingNodePayload.handleId
const fromNode = nodes.find(n => n.id === connectingNodePayload.nodeId)!
const toNode = nodes.find(n => n.id === enteringNodePayload.nodeId)!
const toParentNode = nodes.find(n => n.id === toNode.parentId)
if (fromNode.parentId !== toNode.parentId)
return
const { x, y } = screenToFlowPosition({ x: e.x, y: e.y })
if (fromHandleType === 'source' && (toNode.data.type === BlockEnum.VariableAssigner || toNode.data.type === BlockEnum.VariableAggregator || toNode.data.type === BlockEnum.VariableAggregatorAAA)) {
const groupEnabled = toNode.data.advanced_settings?.group_enabled
const firstGroupId = toNode.data.advanced_settings?.groups[0].groupId
let handleId = 'target'
if (groupEnabled) {
if (hoveringAssignVariableGroupId)
handleId = hoveringAssignVariableGroupId
else
handleId = firstGroupId
}
const newNodes = produce(nodes, (draft) => {
draft.forEach((node) => {
if (node.id === toNode.id) {
node.data._showAddVariablePopup = true
node.data._holdAddVariablePopup = true
}
})
})
setNodes(newNodes)
setShowAssignVariablePopup({
nodeId: fromNode.id,
nodeData: fromNode.data,
variableAssignerNodeId: toNode.id,
variableAssignerNodeData: toNode.data,
variableAssignerNodeHandleId: handleId,
parentNode: toParentNode,
x: x - toNode.positionAbsolute!.x,
y: y - toNode.positionAbsolute!.y,
})
handleNodeConnect({
source: fromNode.id,
sourceHandle: fromHandleId,
target: toNode.id,
targetHandle: 'target',
})
}
}
setConnectingNodePayload(undefined)
setEnteringNodePayload(undefined)
}, [store, handleNodeConnect, getNodesReadOnly, workflowStore, reactflow])
const handleNodeAdd = useCallback<OnNodeAdd>((
{
nodeType,
sourceHandle = 'source',
targetHandle = 'target',
toolDefaultValue,
},
{
prevNodeId,
prevNodeSourceHandle,
nextNodeId,
nextNodeTargetHandle,
},
) => {
...
const {
newNode,
newIterationStartNode,
newLoopStartNode,
} = generateNewNode({
data: {
...NODES_INITIAL_DATA[nodeType],
title: nodesWithSameType.length > 0 ? `${t(`workflow.blocks.${nodeType}`)} ${nodesWithSameType.length + 1}` : t(`workflow.blocks.${nodeType}`),
...(toolDefaultValue || {}),
selected: true,
_showAddVariablePopup: (nodeType === BlockEnum.VariableAssigner || nodeType === BlockEnum.VariableAggregator || nodeType === BlockEnum.VariableAggregatorAAA) && !!prevNodeId,
_holdAddVariablePopup: false,
},
position: {
x: 0,
y: 0,
},
})
...
if (newNode.data.type === BlockEnum.VariableAssigner || newNode.data.type === BlockEnum.VariableAggregator || newNode.data.type === BlockEnum.VariableAggregatorAAA) {
const { setShowAssignVariablePopup } = workflowStore.getState()
setShowAssignVariablePopup({
nodeId: prevNode.id,
nodeData: prevNode.data,
variableAssignerNodeId: newNode.id,
variableAssignerNodeData: (newNode.data as VariableAssignerNodeType),
variableAssignerNodeHandleId: targetHandle,
parentNode: nodes.find(node => node.id === newNode.parentId),
x: -25,
y: 44,
})
}
...
if (newNode.data.type === BlockEnum.VariableAssigner || newNode.data.type === BlockEnum.VariableAggregator || newNode.data.type === BlockEnum.VariableAggregatorAAA) {
const { setShowAssignVariablePopup } = workflowStore.getState()
setShowAssignVariablePopup({
nodeId: prevNode.id,
nodeData: prevNode.data,
variableAssignerNodeId: newNode.id,
variableAssignerNodeData: newNode.data as VariableAssignerNodeType,
variableAssignerNodeHandleId: targetHandle,
parentNode: nodes.find(node => node.id === newNode.parentId),
x: -25,
y: 44,
})
}
...
}, [getNodesReadOnly, store, t, handleSyncWorkflowDraft, saveStateToHistory, workflowStore, getAfterNodesInSameBranch, checkNestedParallelLimit])
web/i18n/zh-Hans/workflow.ts
const translation = {
blocks: {
'variable-aggregator-aaa': '变量聚合器AAA',
},
blocksAbout: {
'variable-aggregator-aaa': 'aaa-将多路分支的变量聚合为一个变量,以实现下游节点统一配置。-aaa',
}
}
至此前端代码修改完毕
添加新的节点,首先要改下后端,再改下前端代码,根据自己的功能写新节点的业务、节点UI设计