DEV Community

Aceld
Aceld

Posted on • Updated on

(Part 5)Golang Framework Hands-on - KisFlow Stream Computing Framework-Connector

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export

To be continued.


In this chapter, we will design the Connector module for KisFlow, whose main function and purpose are mainly to mount the logic of third-party storage engines under a specific Function.

5.1 Connector Definition

KisFlow provides the Connector to allow developers to define a custom read/write plugin mode for third-party storage engines. If data in the data flow needs to be temporarily read from or stored to a specific storage engine, developers can write corresponding read/write logic through Connector and mount it on a specific Function in the Flow through configuration. Connectors are also flexibly configurable. This allows storage algorithms with the same logic to be reused in multiple Functions.

5.1.1 Connector Abstraction Layer Definition

Create a connector.go file in kis-flow/kis/ to define the Connector's abstract interface as follows:

kis-flow/kis/connector.go

package kis

import (
    "context"
    "kis-flow/config"
)

type Connector interface {
    // Init initializes the connection associated with the Connector.
    Init() error
    // Call invokes the read/write operations of the external storage logic attached to the Connector.
    Call(ctx context.Context, flow Flow, args interface{}) error
    // GetId gets the ID of the Connector.
    GetId() string
    // GetName gets the name of the Connector.
    GetName() string
}
Enter fullscreen mode Exit fullscreen mode

At this stage, the main interfaces provided by Connector are:

  • Init(): Mainly for the initialization logic of the third-party storage engine associated with the current Connector, such as creating connections. Init() will only be executed once in the lifecycle of the Connector instance.
  • Call(): The entry point for Connector dispatching, where the custom read/write logic of the related storage is triggered through the Call() method. The specific callback function prototype is defined in the Router module.

5.1.2 Connector Related Routing Member Type Definitions

Based on the above interfaces, a Connector instance needs to configure two custom methods, one called through Init() interface, and the other called through Call() interface. Below are the definitions for these two types of callback prototypes.

(1) Connector Init

kis-flow/kis/router.go

/*
    Connector Init
*/
// ConnInit is the callback function prototype for initializing the third-party mounted storage.
type ConnInit func(conn Connector) error

// connInitRouter is the router managing ConnInit callbacks.
// key: ConnName
type connInitRouter map[string]ConnInit
Enter fullscreen mode Exit fullscreen mode

ConnInit is the prototype for initialization callback functions, with the Connector instance pointer as the parameter.
connInitRouter is the router managing ConnInit callbacks, where the key is ConnName.

(2) Connector Call

kis-flow/kis/router.go

/*
    Connector Call
*/
// CaaS is the callback function prototype for implementing storage read/write operations in the Connector.
type CaaS func(context.Context, Connector, Function, Flow, interface{}) error

// connFuncRouter is the router indexing CaaS callback mappings by FunctionName.
// key: Function Name
// value: CaaS callback for storage read/write operations in the Connector
type connFuncRouter map[string]CaaS
Enter fullscreen mode Exit fullscreen mode
  • CaaS is the prototype for Connector's storage read/write logic callback functions, with parameters Connector, Function, and Flow pointers. Through these three instances, developers can obtain some parameters required by the business. The last parameter is for custom input by the developer during the Function dispatching of the Connector Call.

  • connFuncRouter is the router indexing CaaS callbacks, where the key is the Function Name to which the current Connector is mounted. Therefore, the dispatch of Connector Call must be initiated by Function.

As Connectors can only be mounted on Functions in Save or Load mode, for future convenience in statistics or indexing, it is also necessary to group connFuncRouter into Save and Load groups. Below are the type definitions for the mappings of these two groups.

kis-flow/kis/router.go

// connSL divides connFuncRouter into two sub-trees based on KisMode
// key: Function KisMode S/L
// value: NsConnRouter
type connSL map[common.KisMode]connFuncRouter

// connTree
// key: Connector Name
// value: connSL secondary tree
type connTree map[string]connSL
Enter fullscreen mode Exit fullscreen mode
  • connSL divides the connFuncRouter into two groups based on KisMode, with members being the previously defined Connector's Call routers.
  • connTree indexes the secondary tree connSL by Connector Name. By combining Connector Name + Function Mode + Function Name, the Connector Call function to be dispatched can be determined.

5.2 Connector Routing Management

In the previous section, we defined the types of routing management needed for Connector. Now, we need to manage the addition and scheduling of these routes. Connector's routing management is unified under the KisPool module, just like Function.

5.2.1 KisPool Adds Connector-Related Routing Management Members

(1) Adding Members

kis-flow/kis/pool.go

// kisPool is used to manage all Function and Flow configurations.
type kisPool struct {
    fnRouter funcRouter   // All Function management routes
    fnLock   sync.RWMutex // Lock for fnRouter

    flowRouter flowRouter   // All flow objects
    flowLock   sync.RWMutex // Lock for flowRouter

    // +++++++++++++++++ 
    cInitRouter connInitRouter // All Connector initialization routes
    ciLock      sync.RWMutex   // Lock for cInitRouter

    cTree      connTree             // All Connector management routes
    connectors map[string]Connector // All Connector objects
    cLock      sync.RWMutex         // Lock for cTree
    // +++++++++++++++++ 
}
Enter fullscreen mode Exit fullscreen mode

(2) Initialization of Related Map Variables

kis-flow/kis/pool.go

// Pool is the singleton constructor.
func Pool() *kisPool {
    _poolOnce.Do(func() {
        // Create a kisPool object
        _pool = new(kisPool)

        // Initialize fnRouter
        _pool.fnRouter = make(funcRouter)

        // Initialize flowRouter
        _pool.flowRouter = make(flowRouter)

        // +++++++++++++++++++++++++
        // Initialize cTree
        _pool.cTree = make(connTree)
        _pool.cInitRouter = make(connInitRouter)
        _pool.connectors = make(map[string]Connector)
        // +++++++++++++++++++++++++
    })

    return _pool
}
Enter fullscreen mode Exit fullscreen mode

(3) Register Connector Init Method

kis-flow/kis/pool.go

// Register Connector initialization business.
func (pool *kisPool) RegisterConnectorInit(cname string, c ConnInit) {
    pool.ciLock.Lock() // Write lock
    defer pool.ciLock.Unlock()

    if _, ok := pool.cInitRouter[cname]; !ok {
        pool.cInitRouter[cname] = c
    } else {
        errString := fmt.Sprintf("KisPool Reg Connector Init Repeat CName=%s\n", cname)
        panic(errString)
    }

    log.Logger().InfoF("Add KisPool Connector Init CName=%s", cname)
}
Enter fullscreen mode Exit fullscreen mode

(4) Execute Connector Init Method

kis-flow/kis/pool.go

// Dispatch Connector Init
func (pool *kisPool) DispatchConnectorInit(conn Connector) error {
    pool.ciLock.RLock() // Read lock
    defer pool.ciLock.RUnlock()

    init, ok := pool.cInitRouter[conn.GetName()]

    if !ok {
        panic(errors.New(fmt.Sprintf("init connector cname = %s not registered..", conn.GetName())))
    }

    return init(conn)
}
Enter fullscreen mode Exit fullscreen mode

The logic is straightforward: acquire a lock for protection, and then add key/value pairs. Since this is a routing action, if adding fails, it will panic() and exit the process.

(5) Register Connector Call Method

kis-flow/kis/pool.go

// Register Connector Call business.
func (pool *kisPool) RegisterConnectorCall(cname string, fname string, mode common.KisMode, c CaaS) {
    pool.cLock.Lock() // Write lock
    defer pool.cLock.Unlock()

    if _, ok := pool.cTree[cname]; !ok {
        //cid First registration, does not exist, create a secondary tree NsConnSL
        pool.cTree[cname] = make(connSL)

        // Initialize various FunctionMode
        pool.cTree[cname][common.S] = make(connFuncRouter)
        pool.cTree[cname][common.L] = make(connFuncRouter)
    }

    if _, ok := pool.cTree[cname][mode][fname]; !ok {
        pool.cTree[cname][mode][fname] = c
    } else {
        errString := fmt.Sprintf("Connector Call Repeat CName=%s, FName=%s, Mode =%s\n", cname, fname, mode)
        panic(errString)
    }

    log.Logger().InfoF("Add KisPool Connector Call CName=%s, FName=%s, Mode =%s", cname, fname, mode)
}
Enter fullscreen mode Exit fullscreen mode

The RegisterConnectorCall method is used to register the logic processing callback function of your own Connector connector. When registering, it will add the function to the corresponding routing group based on the passed parameters.

(6) Execute Connector Call Method

kis-flow/kis/pool.go

// Dispatch Connector Call
func (pool *kisPool) DispatchConnectorCall(ctx context.Context, flow Flow, conn Connector, args interface{}) error {
    fn := flow.GetThisFunction()
    fnConf := fn.GetConfig()
    mode := common.KisMode(fnConf.FMode)

    if callback, ok := pool.cTree[conn.GetName()][mode][fnConf.FName]; ok {
        return callback(ctx, conn, fn, flow, args)
    }

    log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode)

    return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
}
Enter fullscreen mode Exit fullscreen mode

DispatchConnectorCall is used to index and find the corresponding CaaS function based on ConnectorName, Function Mode, and Function Name, and then execute it.

5.3 KisConnector

Next, let's define and implement the KisConnector module according to the abstraction layer of Connector. Create a kis_connector.go file in the kis-flow/conn/ directory.

5.3.1 Definition of KisConnector

package conn

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/id"
    "kis-flow/kis"
    "sync"
)

type KisConnector struct {
    // Connector ID
    CId string
    // Connector Name
    CName string
    // Connector Config
    Conf *config.KisConnConfig

    // Connector Init
    onceInit sync.Once
}
Enter fullscreen mode Exit fullscreen mode

In addition to identifying the instance with a KisID and a name, KisConnector also includes the configuration information KisConnConfig for the current KisConnector. It contains a sync.Once member, which is used to control the execution of Connector Init only once in its lifecycle, as will be discussed later.

5.3.1 Constructor Method for KisConnector

// NewKisConnector creates a KisConnector based on the provided configuration.
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
    conn := new(KisConnector)
    conn.CId = id.KisID(common.KisIdTypeConnnector)
    conn.CName = config.CName
    conn.Conf = config

    return conn
}
Enter fullscreen mode Exit fullscreen mode

Creating a KisConnector instance requires having KisConnConfig configuration information.

5.3.2 Implementation of Connector Interface

// Init initializes the storage engine connection associated with the Connector.
func (conn *KisConnector) Init() error {
    var err error

    // Ensure Init is executed only once for a Connector.
    conn.onceInit.Do(func() {
        err = kis.Pool().DispatchConnectorInit(conn)
    })

    return err
}

// Call invokes the read/write operations of the external storage logic for the Connector.
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) error {
    if err := kis.Pool().DispatchConnectorCall(ctx, flow, conn, args); err != nil {
        return err
    }

    return nil
}

func (conn *KisConnector) GetName() string {
    return conn.CName
}
func (conn *KisConnector) GetConfig() *config.KisConnConfig {
    return conn.Conf
}

func (conn *KisConnector) GetId() string {
    return conn.CId
}
Enter fullscreen mode Exit fullscreen mode
  • The Init() method ensures that it is executed only once using sync.Once, and ultimately it is routed and scheduled through KisPool.
  • Similarly, Call() is scheduled through KisPool.

So, when will Init() and Call() of KisConnector be called? Next, we need to implement KisConnConfig to associate the hierarchical relationship of Connector with Function and Flow.

5.4 KisConnConfig Configuration

In the NewKisConnector() function, the parameter is KisConnConfig, so developers need to create a KisConnConfig, which contains the configuration information for a Connector instance. In version 0.1, we have already implemented the definition and creation of the KisConnConfig module. The code is as follows:

kis-flow/config/kis_conn_config.go

package config

import (
    "errors"
    "fmt"
    "kis-flow/common"
)

// KisConnConfig describes the configuration for a KisConnector
type KisConnConfig struct {
    // Configuration type
    KisType string `yaml:"kistype"`
    // Unique identifier
    CName string `yaml:"cname"`
    // Base storage medium address
    AddrString string `yaml:"addrs"`
    // Storage medium engine type: "Mysql", "Redis", "Kafka", etc.
    Type common.KisConnType `yaml:"type"`
    // Identifier for a single storage: e.g., Key name for Redis, Table name for Mysql, Topic name for Kafka, etc.
    Key string `yaml:"key"`
    // Custom parameters in the configuration information
    Params map[string]string `yaml:"params"`
    // NsFuncionID bound to storage reading and writing
    Load []string `yaml:"load"`
    Save []string `yaml:"save"`
}

// NewConnConfig creates a KisConnector strategy configuration object to describe a KisConnector information
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
    strategy := new(KisConnConfig)
    strategy.CName = cName
    strategy.AddrString = addr

    strategy.Type = t
    strategy.Key = key
    strategy.Params = param

    return strategy
}

// WithFunc binds Connector with Function
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {

    switch common.KisMode(fConfig.FMode) {
    case common.S:
        cConfig.Save = append(cConfig.Save, fConfig.FName)
    case common.L:
        cConfig.Load = append(cConfig.Load, fConfig.FName)
    default:
        return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

5.4.1 Adding KisConnConfig Member to KisFuncConfig

First, we add a KisConnConfig member to KisFuncConfig.

kis-flow/config/kis_func_config.go

// KisFuncConfig represents a KisFunction strategy configuration
type KisFuncConfig struct {
    KisType  string        `yaml:"kistype"`
    FName    string        `yaml:"fname"`
    FMode    string        `yaml:"fmode"`
    Source   KisSource     `yaml:"source"`
    Option   KisFuncOption `yaml:"option"`
    // ++++++++++
    connConf *KisConnConfig
}
Enter fullscreen mode Exit fullscreen mode

Then, we provide methods to add and retrieve information about this member.

kis-flow/config/kis_func_config.go

func (fConf *KisFuncConfig) AddConnConfig(cConf *KisConnConfig) error {
    if cConf == nil {
        return errors.New("KisConnConfig is nil")
    }

    // Function needs to be associated with Connector
    fConf.connConf = cConf

    // Connector needs to be associated with Function
    _ = cConf.WithFunc(fConf)

    return nil
}

func (fConf *KisFuncConfig) GetConnConfig() (*KisConnConfig, error) {
    if fConf.connConf == nil {
        return nil, errors.New("KisFuncConfig.connConf not set")
    }

    return fConf.connConf, nil
}
Enter fullscreen mode Exit fullscreen mode

This way, we can retrieve the associated Connector configuration information through the Function's configuration information.

5.5 Function/Flow and Connector Association

5.5.1 Function and Connector Association

kis-flow/kis/function.go

package kis

import (
    "context"
    "kis-flow/config"
)

// Function represents the basic calculation unit in stream computing. A KisFunction is a basic calculation logic unit in stream computing,
// and any number of KisFunctions can be combined into a KisFlow.
type Function interface {
    // Call executes the stream computing logic
    Call(ctx context.Context, flow Flow) error

    // SetConfig configures the current Function instance
    SetConfig(s *config.KisFuncConfig) error
    // GetConfig gets the configuration of the current Function instance
    GetConfig() *config.KisFuncConfig

    // SetFlow sets the Flow instance that the current Function instance depends on
    SetFlow(f Flow) error
    // GetFlow gets the Flow that the current Function instance depends on
    GetFlow() Flow

    // ++++++++++++++++++++
    // AddConnector adds a Connector to the current Function instance
    AddConnector(conn Connector) error
    // GetConnector gets the Connector associated with the current Function instance
    GetConnector() Connector
    // ++++++++++++++++++++

    // CreateId generates a random instance KisID for the current Function instance
    CreateId()
    // GetId gets the FID of the current Function
    GetId() string
    // GetPrevId gets the FID of the previous Function node on the current Function
    GetPrevId() string
    // GetNextId gets the FID of the next Function node on the current Function
    GetNextId() string

    // Next returns the next layer of calculation stream Function, returns nil if the current layer is the last layer
    Next() Function
    // Prev returns the previous layer of calculation stream Function, returns nil if the current layer is the first layer
    Prev() Function
    // SetN sets the next Function instance
    SetN(f Function)
    // SetP sets the previous Function instance
    SetP(f Function)
}
Enter fullscreen mode Exit fullscreen mode

Next, we implement this in BaseFunction:

type BaseFunction struct {
    // Id, the instance ID of KisFunction, used to distinguish different instance objects internally in KisFlow
    Id     string
    Config *config.KisFuncConfig

    // flow
    flow kis.Flow // Context environment KisFlow

    // ++++++++++++++
    // connector
    connector kis.Connector
    // ++++++++++++++

    // link
    N kis.Function // Next stream computing Function
    P kis.Function // Previous stream computing Function
}

// ........
// ........

// AddConnector adds a Connector to the current Function instance
func (base *BaseFunction) AddConnector(conn kis.Connector) error {
    if conn == nil {
        return errors.New("conn is nil")
    }

    base.connector = conn

    return nil
}

// GetConnector gets the Connector associated with the current Function instance
func (base *BaseFunction) GetConnector() kis.Connector {
    return base.connector
}
Enter fullscreen mode Exit fullscreen mode

This allows a Function instance to obtain information about the Connector instance.

5.5.2 Flow and Connector Association

Similarly, Flow also needs to obtain information about the Connector. This requires a simple association between Flow and Connector.

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run schedules the Flow, sequentially schedules the Functions in the Flow and executes them
    Run(ctx context.Context) error
    // Link connects the Functions in the Flow according to the configuration in the configuration file
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow submits Flow data to the upcoming Function layer
    CommitRow(row interface{}) error
    // Input gets the input source data of the currently executing Function in the Flow
    Input() common.KisRowArr
    // GetName gets the name of the Flow
    GetName() string
    // GetThisFunction gets the currently executing Function
    GetThisFunction() Function
    // GetThisFuncConf gets the configuration of the currently executing Function
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector gets the Connector of the currently executing Function
    // +++++++++++++++++++++++++++++++++
    GetConnector() (Connector, error)
    // GetConnConf gets the configuration of the Connector of the currently executing Function
    GetConnConf() (*config.KisConnConfig, error)
    // +++++++++++++++++++++++++++++++++
}
Enter fullscreen mode Exit fullscreen mode

We add GetConnector() and GetConnConf() methods to obtain the Connector instance and Connector configuration. Then, we implement these methods in KisFlow.

// GetConnector gets the Connector of the currently executing Function
func (flow *KisFlow) GetConnector() (kis.Connector, error) {
    if conn := flow.ThisFunction.GetConnector(); conn != nil {
        return conn, nil
    } else {
        return nil, errors.New("GetConnector(): Connector is nil")
    }
}

// GetConnConf gets the configuration of the Connector of the currently executing Function
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
    if conn := flow.ThisFunction.GetConnector(); conn != nil {
        return conn.GetConfig(), nil
    } else {
        return nil, errors.New("GetConnConf(): Connector is nil")
    }
}
Enter fullscreen mode Exit fullscreen mode

This allows us to retrieve the associated Connector configuration information through the Function's configuration information.

5.5.3 Linking Function to Connector

According to the previous configuration file definition, the function's YAML configuration file is as follows:

nstype: func
fname: TestFunction_L1
fmode: Load
source:
  name: testSource
  must:
    - stuid
    - classid
option:
  cname: Test-NsConnector_2
Enter fullscreen mode Exit fullscreen mode

Here, there is an Option, with one of its members cname. If the current Function is configured with a Connector, then the cname should be configured in the current Option, and the name of the Connector cname should be filled in.

When a Flow is linked to a Function, after the Function instance is created, if the Function carries a Connector, a Connector instance also needs to be created. This can be achieved through the Function's configuration information.

kis-flow/flow/kis_flow.go

// Link connects the Function to the Flow
// fConf: Current Function strategy
// fParams: Dynamic parameters carried by the current Flow
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
    // Create Function instance
    f := function.NewKisFunction(flow, fConf)

    // ++++++++++++++++++++++++++++++
    if fConf.Option.CName != "" {
        // The current Function is associated with a Connector and needs to initialize the Connector instance

        // Get Connector configuration
        connConfig, err := fConf.GetConnConfig()
        if err != nil {
            panic(err)
        }

        // Create Connector object
        connector := conn.NewKisConnector(connConfig)

        // Initialize the Connector, execute the Connector Init method
        if err = connector.Init(); err != nil {
            panic(err)
        }

        // Associate the Function instance with the Connector instance
        _ = f.AddConnector(connector)
    }
    // ++++++++++++++++++++++++++++++


    // Flow adds Function
    if err := flow.appendFunc(f, fParams); err != nil {
        return err
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

This creates a Connector instance.

5.6 KisConnector Unit Testing

Next, let's perform unit testing on KisConnector.

5.6.1 Unit Testing

Create the kis-flow/test/kis_connector_test.go file:

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/flow"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestNewKisConnector(t *testing.T) {

    ctx := context.Background()

    // 0. Register Function callback business
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. Register ConnectorInit and Connector callback business
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. Create 3 KisFunction configuration instances, where myFuncConfig2 has Connector configuration
    source1 := config.KisSource{
        Name: "Public account Douyin mall customer order data",
        Must: []string{"order_id", "user_id"},
    }

    source2 := config.KisSource{
        Name: "User order error rate",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
    if myFuncConfig1 == nil {
        panic("myFuncConfig1 is nil")
    }

    option := config.KisFuncOption{
        CName: "ConnName1",
    }

    myFuncConfig2 := config.NewFuncConfig("funcName2", common.S, &source2, &option)
    if myFuncConfig2 == nil {
        panic("myFuncConfig2 is nil")
    }

    myFuncConfig3 := config.NewFuncConfig("funcName3", common.E, &source2, nil)
    if myFuncConfig3 == nil {
        panic("myFuncConfig3 is nil")
    }

    // 2. Create a KisConnector configuration instance
    myConnConfig1 := config.NewConnConfig("ConnName1", "0.0.0.0:9998", common.REDIS, "redis-key", nil)
    if myConnConfig1 == nil {
        panic("myConnConfig1 is nil")
    }

    // 3. Bind the KisConnector configuration instance to the KisFunction configuration instance
    _ = myFuncConfig2.AddConnConfig(myConnConfig1)

    // 4. Create a KisFlow configuration instance
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 5. Create a KisFlow object
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 6. Concatenate Functions to Flow
    if err := flow1.Link(myFuncConfig1, nil); err != nil {
        panic(err)
    }
    if err := flow1.Link(myFuncConfig2, nil); err != nil {
        panic(err)
    }
    if err := flow1.Link(myFuncConfig3, nil); err != nil {
        panic(err)
    }

    // 7. Submit original data
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 8. Execute flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

Enter fullscreen mode Exit fullscreen mode

Note that funcName2 is the Function associated with the Connector. So when creating the Config for Function2, Option information is provided, and the name of the associated Connector is provided.

5.6.2 Function Callback and Connector Callback

For easy management of Callback business, we create the kis-flow/test/faas/ and kis-flow/test/caas/ directories under kis-flow/test/.
Create a file in each directory, with each file containing one type of custom business.

├── caas
│   ├── caas_demo1.go
│   └── caas_init1.go
├── faas
│   ├── faas_demo1.go
│   ├── faas_demo2.go
│   └── faas_demo3.go
Enter fullscreen mode Exit fullscreen mode

(1) Callback business for FuncName1

kis-flow/test/faas/faas_demo1.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName1Handler ----")

    for index, row := range flow.Input() {
        // Print data
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // Calculate result data
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // Submit result data
        _ = flow.CommitRow(resultStr)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

This serves as our first Function, printing data and generating some more data.

allback business for FuncName2

kis-flow/test/faas/faas_demo2.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
    "kis-flow/log"
)

func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName2Handler ----")

    for index, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        conn, err := flow.GetConnector()
        if err != nil {
            log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
            return err
        }

        if conn.Call(ctx, flow, row) != nil {
            log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
            return err
        }

        // Calculate result data
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // Submit result data
        _ = flow.CommitRow(resultStr)
    }

    return nil
}

Enter fullscreen mode Exit fullscreen mode

FuncName2 is a business associated with Connector. You can get the Connector instance through flow.GetConnector(), and then execute the business logic by executing conn.Call().

(3) Callback business for FuncName3

kis-flow/test/faas/faas_demo3.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName3Handler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return nil
}

Enter fullscreen mode Exit fullscreen mode

(4) Init method for ConnName1

kis-flow/test/caas/caas_init1.go

package caas

import (
    "fmt"
    "kis-flow/kis"
)

func InitConnDemo1(connector kis.Connector) error {
    fmt.Println("===> Call Connector InitDemo1")
    //config info
    connConf := connector.GetConfig()

    fmt.Println(connConf)

    // init connector, such as initializing database connection, etc.

    return nil
}

Enter fullscreen mode Exit fullscreen mode

(5) Callback business for ConnName1

kis-flow/test/caas/caas_demo1.go

package caas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
    fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
        flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)

    fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

5.6.3 Running

Navigate to kis-flow/test/ directory, and execute the command:

go test -test.v -test.paniconexit0 -test.run TestNewKisConnector
Enter fullscreen mode Exit fullscreen mode

Results:

=== RUN   TestNewKisConnector
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{ ConnName1 0.0.0.0:9998 redis redis-key map[] [] [funcName2]}
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionC, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c240 ThisFunctionId:func-f594da0e28da417db6b15ce9c9530f84 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f594da0e28da417db6b15ce9c9530f84, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c2a0 ThisFunctionId:func-f0b4bebf87614828a9375d888c54d13b PrevFunctionId:func-f594da0e28da417db6b15ce9c9530f84 funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-f0b4bebf87614828a9375d888c54d13b, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-e5bdad6cb44e47c4b51ffdd4f53148fe
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionE, flow = &{Id:flow-e5bdad6cb44e47c4b51ffdd4f53148fe Name:flowName1 Conf:0xc000026800 Funcs:map[func-66e2b0afa4e14d179aa94c357c412cf8:0xc00007c300 func-f0b4bebf87614828a9375d888c54d13b:0xc00007c2a0 func-f594da0e28da417db6b15ce9c9530f84:0xc00007c240] FlowHead:0xc00007c240 FlowTail:0xc00007c300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00007c300 ThisFunctionId:func-66e2b0afa4e14d179aa94c357c412cf8 PrevFunctionId:func-f0b4bebf87614828a9375d888c54d13b funcParams:map[func-66e2b0afa4e14d179aa94c357c412cf8:map[] func-f0b4bebf87614828a9375d888c54d13b:map[] func-f594da0e28da417db6b15ce9c9530f84:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f0b4bebf87614828a9375d888c54d13b:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f594da0e28da417db6b15ce9c9530f84:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-66e2b0afa4e14d179aa94c357c412cf8, row = data from funcName[funcName2], index = 2
--- PASS: TestNewKisConnector (0.00s)
PASS
Enter fullscreen mode Exit fullscreen mode

Upon careful examination of the logs, it is evident that the Connector's initialization was executed. Furthermore, the Connector was also synchronously executed during the execution of FunctionName2, with logging output, which aligns with our expectations.

5.7 [V0.4] Source Code

You can find the source code for version 0.4 at the following link:

https://github.com/aceld/kis-flow


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export


Top comments (0)