Hello. Yorozu Counselor Sugimon: yum :. This time, I will try to develop a component (adapter) using the SDK of HULFT IoT Edge Streaming. It is described so that it will be completed in all three parts.
This time, as the second step, I will develop a plug-in using the HULFT IoT EdgeStreaming Plugin SDK. There are other articles on setup and execution, so please refer to them.
: arrow_forward: [I tried to create a plug-in with HULFT IoT Edge Streaming Setup] (https://qiita.com/sugimon/items/cf7503479c6e4c46c3b3) : arrow_forward: I tried to create a plug-in with HULFT IoT Edge Streaming Development : arrow_forward: [I tried to create a plug-in with HULFT IoT Edge Streaming Execution](https://qiita.com/sugimon/items/93f47d7bd472a8b18e54)
The configuration for creating an EdgeStreaming plug-in is as follows.
It is the Runtime part that executes the Streaming process specified by Studio.
SourceOperation `This is an operation to generate Streaming data (Tuple). It will be "input processing".
SinkOperation `This is an operation to output Streaming data (Tuple) to the outside. It becomes "output processing". ``
UDSFOperation
This is an operation that converts Streaming data (Tuple). It also outputs the converted Streming (Tuple). "I / O (conversion) processing"
Generates a BQL (SQL-like grammar) that tells the RunTime how to perform Streaming processing.
AdapterModuleComponent `A class that represents a component (adapter). It is many-to-one with the operation. ``
BQLPluginSourceOperationFactory `Defines the properties of SourceOperartion. "Input processing" Outputs a BQL (Create Source statement) that creates a Source operation. ``
BQLPluginOutputOperationFactory `Defines the properties of SinkOperation. "Output processing" Outputs a BQL (Create Sink statement) that creates a Sink operation. ``
BQLPluginUDSFOperationFactory ʻDefine UDSF properties. "Input / output (conversion) processing" Outputs a BQL (Select statement) that generates a UDSF operation. ``
This time, I tried to create a plug-in in the following form.
·Basic information
** ・ Operation information **
Implements the Streaming process specified by Studio.
** Environmental preparation **
$ SDK_HOME / dev / go / src / github.sis.saison.co.jp/sherpa/es-agent/sample
$ SDK_HOME / dev / go / src / github.sis.saison.co.jp/sherpa/es-agent/sample/plugin
$ SDK_HOME / dev / go / src / github.sis.saison.co.jp/sherpa/es-agent/sample/external
** Creating a source file ** Let's create a source file with the following file name in the module directory (sample in this case).
-The file structure is as follows.
├─sample
│ ├─source.go (input processing)
│ ├─sink.go (output processing)
│ ├─udsf.go (input / output conversion processing)
│ │
│ ├─external
│ │ ├─plugin_main.go (main function)
│ │ │
│ └─plugin
│ │ ├─plugin.go (register the source files under each sample)
│ │ │
Now let's create the source file.
** ・ source.go (input processing) ** Create a process that generates pseudo-random numbers at regular time intervals. The main flow is as follows.
Tuple
{
"type": "object",
"required": ["payload"],
"properties": {
"payload": {
"type": "object",
"required": ["value"],
"properties": {
"value": {
"type": "number"
}
}
}
}
}
The output Tuple JSON data has the following format.
{"payload": {"value": 3.5423242}}
I created source.go as follows.
source.go
package sample
import (
"math/rand"
"time"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type source struct {
interval time.Duration
term chan struct{}
}
func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error {
rand.Seed(time.Now().UnixNano())
next := time.Now()
for {
val := rand.Float64()
m := data.Map{"value": data.Float(val)}
t := core.NewTuple(data.Map{"payload": m})
if s.interval > 0 {
t.Timestamp = next
}
ctx.Log().Debug("generation: ", val)
if err := w.Write(ctx, t); err != nil {
return err
}
if s.interval > 0 {
now := time.Now()
next = next.Add(s.interval)
if next.Before(now) {
next = now.Add(s.interval)
}
select {
case <-s.term:
return core.ErrSourceStopped
case <-time.After(next.Sub(now)):
}
}
}
return nil
}
func (s *source) Stop(ctx *core.Context) error {
s.term <- struct{}{}
return nil
}
func CreateSource(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Source, error) {
interval, err := getInterval(params)
if err != nil {
return nil, err
}
return &source{
interval: interval,
term: make(chan struct{}),
}, nil
}
func getInterval(params data.Map) (time.Duration, error) {
interval := 1 * time.Second
if v, ok := params["interval"]; ok {
i, err := data.ToDuration(v)
if err != nil {
return interval, err
}
interval = i
}
return interval, nil
}
** ・ sink.go (output processing) ** Create a process to truncate by the number of decimal places and output to the log.
The main flow is as follows.
Tuple
{
"type": "object",
"required": ["payload"],
"properties": {
"payload": {
"type": "object",
"required": ["value", "formula"],
"properties": {
"value": {
"type": "number"
}
"formula": {
"type": "string"
}
}
}
}
}
I created sink.go as follows.
sink.go
package sample
import (
"fmt"
"math"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type sink struct {
decimal int
}
func (s *sink) Write(ctx *core.Context, tuple *core.Tuple) error {
p, ok := tuple.Data["payload"]
if !ok {
return fmt.Errorf("the tuple doesn't have the required field: payload")
}
payload, err := data.AsMap(p)
if err != nil {
return err
}
v, ok := payload["value"]
if !ok {
return fmt.Errorf("the tuple doesn't have the required field: value")
}
value, err := data.AsFloat(v)
if err != nil {
return err
}
f, ok := payload["formula"]
if !ok {
return fmt.Errorf("the tuple doesn't have the required field: formula")
}
formula, err := data.AsString(f)
if err != nil {
return err
}
shift := math.Pow(10, float64(s.decimal))
value = math.Floor(value*shift) / shift
ctx.Log().Infof("formula: %s", formula)
ctx.Log().Infof("value: %f", value)
return nil
}
func (s *sink) Close(ctx *core.Context) error {
return nil
}
func CreateSink(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
decimal, err := getDecimal(params)
if err != nil {
return nil, err
}
return &sink{
decimal: decimal,
}, nil
}
func getDecimal(params data.Map) (int, error) {
node, ok := params["decimal"]
if !ok {
return 0, fmt.Errorf("decimal is required")
}
decimal, err := data.AsInt(node)
if err != nil {
return 0, fmt.Errorf("decimal must be a int:%s", err)
}
return int(decimal), nil
}
** ・ udsf.go (input / output conversion processing) ** The udsf object receives the following data: --String type parameter: stream_name (input Stream name) --operator --Floating point type parameter: initial_value
The udsf object continues to calculate the value of the received Tuple value element with the operator specified in the current value (initial_value at the start).
I created udsf.go as follows.
udsf.go
package sample
import (
"fmt"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type operator byte
const (
none = ' '
plus = '+'
minus = '-'
times = '*'
divided = '/'
)
type udsf struct {
cur float64
ope operator
}
func (u *udsf) Process(ctx *core.Context, tuple *core.Tuple, w core.Writer) error {
p, ok := tuple.Data["payload"]
if !ok {
return fmt.Errorf("the tuple doesn't have the required field: payload")
}
payload, err := data.AsMap(p)
if err != nil {
return err
}
v, ok := payload["value"]
if !ok {
return fmt.Errorf("the tuple doesn't have the required field: value")
}
value, err := data.AsFloat(v)
if err != nil {
return err
}
var formula string
newVal := u.cur
switch u.ope {
case plus:
newVal += value
case minus:
newVal -= value
case times:
newVal *= value
case divided:
newVal /= value
}
formula = fmt.Sprintf("%f %s %f", u.cur, string(u.ope), value)
ctx.Log().Debug("calculate: " + formula)
m := data.Map{
"value": data.Float(newVal),
"formula": data.String(formula),
}
if err := w.Write(ctx, core.NewTuple(data.Map{"payload": m})); err != nil {
return err
}
u.cur = newVal
return nil
}
func (u *udsf) Terminate(ctx *core.Context) error {
return nil
}
func CreateUDSF(decl udf.UDSFDeclarer, params data.Map) (udf.UDSF, error) {
inputStream, err := getStreamName(params)
if err != nil {
return nil, err
}
operator, err := getOperator(params)
if err != nil {
return nil, err
}
initialValue, err := getInitialValue(params)
if err != nil {
return nil, err
}
if err := decl.Input(inputStream, nil); err != nil {
return nil, err
}
return &udsf{
ope: operator,
cur: initialValue,
}, nil
}
func getStreamName(params data.Map) (string, error) {
node, ok := params["stream_name"]
if !ok {
return "", fmt.Errorf("stream_name is required")
}
streamName, err := data.AsString(node)
if err != nil {
return "", fmt.Errorf("stream_name must be a string:%s", err)
}
return streamName, nil
}
func getOperator(params data.Map) (operator, error) {
node, ok := params["operator"]
if !ok {
return none, fmt.Errorf("operator is required")
}
operatorStr, err := data.AsString(node)
if err != nil {
return none, fmt.Errorf("operator must be a string:%s", err)
}
switch operatorStr {
case "plus":
return plus, nil
case "minus":
return minus, nil
case "times":
return times, nil
case "divided":
return divided, nil
default:
return none, fmt.Errorf("invalid oparator")
}
}
func getInitialValue(params data.Map) (float64, error) {
initialValue := 0.0
node, ok := params["initial_value"]
if !ok {
return initialValue, nil
}
initialValue, err := data.AsFloat(node)
if err != nil {
return initialValue, fmt.Errorf("initial_value is invalid")
}
return initialValue, nil
}
** Registration of source file ** Create plugin.go in the plugin directory and implement the registration process of Source, Sink and UDSF Operation (to use as BQL).
I created plugin.go as follows.
plugin.go
package plugin
import (
"github.sis.saison.co.jp/sherpa/es-agent/sample"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/bql/udf"
)
func init() {
bql.MustRegisterGlobalSourceCreator("sample_source", bql.SourceCreatorFunc(sample.CreateSource))
bql.MustRegisterGlobalSinkCreator("sample_sink", bql.SinkCreatorFunc(sample.CreateSink))
udf.MustRegisterGlobalUDSFCreator("sample_udsf", udf.MustConvertToUDSFCreator(sample.CreateUDSF))
}
** Creating the main function ** Finally, create and implement plugin_main.go in the external directory of the main function that calls the process as a single execution module.
I created plugin_main.go as follows.
plugin_main.go
package main
import (
"os"
"github.sis.saison.co.jp/sherpa/es-agent/external/plugin"
_ "github.sis.saison.co.jp/sherpa/es-agent/sample/plugin"
)
func main() {
if err := plugin.NewServer().ListenAndServe(); err != nil {
os.Exit(1)
}
}
At this point, the preparation on the Runtime side is complete. Next, we will implement the development environment side.
Implements BQL (SQL-like syntax) generation that tells the Runtime how to perform Streaming processing.
** Environmental preparation **
** Creating a module directory **
-Create the $ SDK_HOME / dev / sample_adapter
directory.
** Copy files **
-Copy build.xml and config.properties in the $ SDK_HOME / dev / conf
directory to the module directory.
$SDK_HOME/dev/conf/build.xml
⇒ $SDK_HOME/dev/sample_adapter/build.xml
$SDK_HOME/dev/conf/config.properties
⇒ $SDK_HOME/dev/sample_adapter/config.propertites
Edit the copied config.properties file.
Implementation-Title=SampleAdapter
Implementation-Vendor=sugimon
Implementation-Version=0
module.category=Sample
module.label=Sample Plugin
display.name=Sample Plugin Adapter
plugin.name=sample_plugin
esagent.plugin.package=github.sis.saison.co.jp/sherpa/es-agent/sample
** Create a directory for source files **
Create a src directory in $ SDK_HOME / dev / sample_adapter
.
($SDK_HOME/dev/sample_adapter/src)
Next, create the java file package com / appresso / ds / dp / modules / adapter / sample
Create a directory for the following packages so that
($SDK_HOME/dev/sample_adapter/src/com/appresso/ds/dp/modules/adapter/sample)
** Creating a source file ** Create a source file in the package directory with the following file name.
・SampleAdapterModuleComponent.java ・SampleSinkOperationFactory.java ・SampleSourceOperationFactory.java ・SampleUDSFOperationFactory.java
├─ sample_adapter
│ │ build.xml
│ │ config.properties
│ ├─ src
│ │ └com
│ │ └appresso
│ │ └ds
│ │ └dp
│ │ └modules
│ │ └adapter
│ │ └sample
│ │ SampleAdapterModuleComponent.java
│ │ SampleSinkOperationFactory.java
│ │ SampleSourceOperationFactory.java
│ │ SampleUDSFOperationFactory.java
Let's create a source file for each of these as well.
** ・ SampleSourceOperationFactory.java (input processing) ** Returns an object that holds the properties of the Source operation, or an operation object. (Inheritance source class: BQLPluginSourceOperationFactory class)
I created SampleSourceOperationFactory.java as follows.
SampleSourceOperationFactory.java
package com.appresso.ds.dp.modules.adapter.sample;
import com.appresso.ds.common.spi.constraint.NumberFillin;
import com.appresso.ds.common.spi.param.SimpleParameter;
import com.appresso.ds.common.xmlfw.xml.XmlHandler;
import com.appresso.ds.dp.share.adapter.bql.common.BQLPluginSourceOperationFactory;
import com.appresso.ds.dp.spi.OperationConfiguration;
import com.appresso.ds.dp.spi.OperationConfigurator;
import org.xml.sax.SAXException;
import static com.appresso.ds.common.bql.BQLSimpleParameterType.FLOAT;
public class SampleSourceOperationFactory extends BQLPluginSourceOperationFactory {
@Override
protected String getLabel() {
return "Sample source";
}
@Override
protected String getPluginName() {
return "sample_plugin";
}
@Override
public String getOperationName() {
return "sample_source";
}
@Override
protected String getTypeName() {
return "sample_source";
}
@Override
protected void setupOperationConfigurator(OperationConfigurator operationConfigurator) {
operationConfigurator.addSimpleParameter(createIntervalParameter());
}
@Override
protected void setupOutputSchema(XmlHandler handler, OperationConfiguration conf) throws Exception {
handler.startElement("", "payload", "payload", EMPTY_ATTRIBUTE);
writeElement(handler, "value");
handler.endElement("", "payload", "payload");
}
protected void writeElement(XmlHandler handler, String name) throws SAXException {
handler.startElement("", name, name, EMPTY_ATTRIBUTE);
handler.endElement("", name, name);
}
static SimpleParameter createIntervalParameter() {
NumberFillin fillin = new NumberFillin();
fillin.setMinValue(0.001);
fillin.setMaxValue(1314000);
fillin.setAllowMin(true);
fillin.setAllowMax(true);
fillin.setPrecision(10);
fillin.setDecimal(3);
fillin.setAllowDouble(true);
fillin.setLabel("Interval[sec]");
fillin.setRequired(true);
return new SimpleParameter(FLOAT.toParameterKey("interval"), fillin);
}
}
** ・ SampleSinkOperationFactory.java (output processing) ** Returns an object that holds the properties of the Sink operation, or an operation object. (Inheritance source class: BQLPluginSinkOperationFactory class)
I created SampleSinkOperationFactory.java as follows.
SampleSinkOperationFactory.java
package com.appresso.ds.dp.modules.adapter.sample;
import com.appresso.ds.common.spi.constraint.NumberFillin;
import com.appresso.ds.common.spi.param.SimpleParameter;
import com.appresso.ds.common.xmlfw.xml.XmlHandler;
import com.appresso.ds.dp.share.adapter.bql.common.BQLPluginOutputOperationFactory;
import com.appresso.ds.dp.spi.OperationConfiguration;
import com.appresso.ds.dp.spi.OperationConfigurator;
import com.appresso.ds.dp.spi.OperationContext;
import org.xml.sax.SAXException;
import static com.appresso.ds.common.bql.BQLSimpleParameterType.INTEGER;
public class SampleSinkOperationFactory extends BQLPluginOutputOperationFactory {
@Override
protected String getLabel() {
return "Sample sink";
}
@Override
protected String getPluginName() {
return "sample_plugin";
}
@Override
public String getOperationName() {
return "sample_sink";
}
@Override
protected String getTypeName() {
return "sample_sink";
}
@Override
protected void setupOperationConfigurator(OperationConfigurator operationConfigurator) {
operationConfigurator.addSimpleParameter(createDecimalParameter());
}
protected void setupInputSchema(XmlHandler handler, OperationConfiguration conf, OperationContext context)
throws Exception {
handler.startElement("", "payload", "payload", EMPTY_ATTRIBUTE);
writeElement(handler, "formula");
writeElement(handler, "value");
handler.endElement("", "payload", "payload");
}
protected void writeElement(XmlHandler handler, String name) throws SAXException {
handler.startElement("", name, name, EMPTY_ATTRIBUTE);
handler.endElement("", name, name);
}
static SimpleParameter createDecimalParameter() {
NumberFillin fillin = new NumberFillin();
fillin.setMinValue(0);
fillin.setMaxValue(10);
fillin.setAllowMin(true);
fillin.setAllowMax(true);
fillin.setLabel("Decimal");
fillin.setRequired(true);
return new SimpleParameter(INTEGER.toParameterKey("decimal"), fillin);
}
}
** ・ SampleUDSFOperationFactory ** Returns an object that holds the properties of a UDSF operation, or an operation object.
I created SampleUDSFOperationFactory.java as follows.
SampleUDSFOperationFactory.java
package com.appresso.ds.dp.modules.adapter.sample;
import com.appresso.ds.common.bql.UDSFFromArgument;
import com.appresso.ds.common.bql.UDSFFromTemplate;
import com.appresso.ds.common.spi.constraint.Item;
import com.appresso.ds.common.spi.constraint.Multi;
import com.appresso.ds.common.spi.constraint.NumberFillin;
import com.appresso.ds.common.spi.param.SimpleParameter;
import com.appresso.ds.common.xmlfw.xml.XmlHandler;
import com.appresso.ds.dp.share.adapter.bql.common.BQLPluginUDSFOperationFactory;
import com.appresso.ds.dp.spi.OperationConfiguration;
import com.appresso.ds.dp.spi.OperationConfigurator;
import com.appresso.ds.dp.spi.OperationContext;
import org.xml.sax.SAXException;
import java.util.stream.Stream;
import static com.appresso.ds.common.bql.BQLSimpleParameterType.FLOAT;
import static com.appresso.ds.common.bql.BQLSimpleParameterType.STRING;
public class SampleUDSFOperationFactory extends BQLPluginUDSFOperationFactory {
@Override
protected String getLabel() {
return "Sample UDSF";
}
@Override
public String getPluginName() {
return "sample_plugin";
}
@Override
protected String getTypeName() {
return "sample_udsf";
}
@Override
public String getOperationName() {
return "sample_udsf";
}
@Override
protected void addArgs(UDSFFromTemplate template) {
template.addArg(new UDSFFromArgument(STRING.toParameterKey("operator")));
template.addArg(new UDSFFromArgument(FLOAT.toParameterKey("initial_value")));
}
@Override
protected void setupOperationConfigurator(OperationConfigurator operationConfigurator) {
setStreamConfigurationParameter(operationConfigurator);
operationConfigurator.addSimpleParameter(createOperatorParameter());
operationConfigurator.addSimpleParameter(createInitialValueParameter());
}
@Override
protected void setupInputSchema(XmlHandler handler, OperationConfiguration conf, OperationContext context)
throws Exception {
handler.startElement("", "payload", "payload", EMPTY_ATTRIBUTE);
writeElement(handler, "value");
handler.endElement("", "payload", "payload");
}
@Override
protected void setupOutputSchema(XmlHandler handler, OperationConfiguration conf, OperationContext context)
throws Exception {
handler.startElement("", "payload", "payload", EMPTY_ATTRIBUTE);
writeElement(handler, "formula");
writeElement(handler, "value");
handler.endElement("", "payload", "payload");
}
protected void writeElement(XmlHandler handler, String name) throws SAXException {
handler.startElement("", name, name, EMPTY_ATTRIBUTE);
handler.endElement("", name, name);
}
static SimpleParameter createInitialValueParameter() {
NumberFillin fillin = new NumberFillin();
fillin.setPrecision(10);
fillin.setDecimal(3);
fillin.setAllowDouble(true);
fillin.setLabel("Initial value");
fillin.setRequired(true);
return new SimpleParameter(FLOAT.toParameterKey("initial_value"), fillin);
}
static SimpleParameter createOperatorParameter(){
Multi multi = new Multi(Operator.getItems());
multi.setLabel("Operator");
multi.setRequired(true);
SimpleParameter param = new SimpleParameter(STRING.toParameterKey("operator"), multi);
return param;
}
enum Operator {
Plus("+","plus"),
Minus("-","minus"),
Times("*","times"),
Divided("/","divided");
public String getDisplayName() {
return displayName;
}
public String getValue() {
return value;
}
private final String displayName;
private final String value;
private Operator(String displayName, String value) {
this.displayName = displayName;
this.value=value;
}
Item toItem(){
return new Item(value,displayName);
}
static Item[] getItems(){
return Stream.of(Operator.values()).map(s->s.toItem()).toArray(Item[]::new);
}
}
}
** ・ SampleAdapterModuleComponent ** A class that represents a component (adapter).
I created SampleAdapterModuleComponent.java as follows.
SampleAdapterModuleComponent.java
package com.appresso.ds.dp.modules.adapter.sample;
import java.util.ArrayList;
import java.util.List;
import com.appresso.ds.common.kernel.modules.LicenseManager;
import com.appresso.ds.common.license.LicensePackageType;
import com.appresso.ds.dp.spi.AdapterModuleComponent;
import com.appresso.ds.dp.spi.OperationFactory;
import com.appresso.ds.dp.spi.ResourceFactory;
public class SampleAdapterModuleComponent extends AdapterModuleComponent {
private static final String MODULE_COMPONENT_NAME = "Sample Adapter";
@Override
public OperationFactory[] getOperationFactories() throws Exception {
List<OperationFactory> operationFactories = new ArrayList<>();
operationFactories.add(new SampleSourceOperationFactory());
operationFactories.add(new SampleUDSFOperationFactory());
operationFactories.add(new SampleSinkOperationFactory());
return operationFactories.toArray(new OperationFactory[operationFactories.size()]);
}
@Override
public ResourceFactory[] getResourceFactories() throws Exception {
return new ResourceFactory[]{};
}
public void checkLicense() throws Exception {
LicenseManager licenseManager = getContext().getProxy(LicenseManager.class);
licenseManager.checkLicense(getModuleComponentName(), getPermittedPackageType());
}
private String getModuleComponentName() {
return MODULE_COMPONENT_NAME;
}
private int[] getPermittedPackageType() {
return new int[]{LicensePackageType.TYPE_BASIC_SERVER};
}
}
This time, I actually implemented the processing separately for the execution environment side and the development environment side. This completes the plug-in creation process. Next time I would like to build and run these.
In this blog, I would like to continue to introduce the contents of consultations at the "Yorozu Consultation Counter" of technology and the tricks that were born.
Please continue to check it out and follow us if you like.
See you again!
Recommended Posts