Last updated: Jan 17, 2024
You can use Extension Model nodes to generate an Extension model nugget. The scripting name of this model nugget is applyextension. For more information on scripting the modeling node itself, see extensionmodelnode properties.
Python for Spark example
script example for Python for Spark
applyModel = stream.findByType("extension_apply", None)
score_script = """
import json
import spss.pyspark.runtime
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.tree import DecisionTreeModel
from pyspark.sql.types import StringType, StructField
cxt = spss.pyspark.runtime.getContext()
if cxt.isComputeDataModelOnly():
_schema = cxt.getSparkInputSchema()
_schema.fields.append(StructField("Prediction", StringType(), nullable=True))
cxt.setSparkOutputSchema(_schema)
else:
df = cxt.getSparkInputData()
_modelPath = cxt.getModelContentToPath("TreeModel")
metadata = json.loads(cxt.getModelContentToString("model.dm"))
schema = df.dtypes[:]
target = "Drug"
predictors = ["Age","BP","Sex","Cholesterol","Na","K"]
lookup = {}
for i in range(0,len(schema)):
lookup[schema[i][0]] = i
def row2LabeledPoint(dm,lookup,target,predictors,row):
target_index = lookup[target]
tval = dm[target_index].index(row[target_index])
pvals = []
for predictor in predictors:
predictor_index = lookup[predictor]
if isinstance(dm[predictor_index],list):
pval = row[predictor_index] in dm[predictor_index] and dm[predictor_index].index(row[predictor_index]) or -1
else:
pval = row[predictor_index]
pvals.append(pval)
return LabeledPoint(tval, DenseVector(pvals))
# convert dataframe to an RDD containing LabeledPoint
lps = df.rdd.map(lambda row: row2LabeledPoint(metadata,lookup,target,predictors,row))
treeModel = DecisionTreeModel.load(cxt.getSparkContext(), _modelPath);
# score the model, produces an RDD containing just double values
predictions = treeModel.predict(lps.map(lambda lp: lp.features))
def addPrediction(x,dm,lookup,target):
result = []
for _idx in range(0, len(x[0])):
result.append(x[0][_idx])
result.append(dm[lookup[target]][int(x[1])])
return result
_schema = cxt.getSparkInputSchema()
_schema.fields.append(StructField("Prediction", StringType(), nullable=True))
rdd2 = df.rdd.zip(predictions).map(lambda x:addPrediction(x, metadata, lookup, target))
outDF = cxt.getSparkSQLContext().createDataFrame(rdd2, _schema)
cxt.setSparkOutputData(outDF)
"""
applyModel.setPropertyValue("python_syntax", score_script)
R example
script example for R
applyModel.setPropertyValue("r_syntax", """
result<-predict(modelerModel,newdata=modelerData)
modelerData<-cbind(modelerData,result)
var1<-c(fieldName="NaPrediction",fieldLabel="",fieldStorage="real",fieldMeasure="",
fieldFormat="",fieldRole="")
modelerDataModel<-data.frame(modelerDataModel,var1)""")
applyextension Properties |
Values | Property Description |
---|---|---|
r_syntax |
string | R scripting syntax for model scoring. |
python_syntax |
string | Python scripting syntax for model scoring. |
use_batch_size |
flag | Enable use of batch processing. |
batch_size |
integer | Specify the number of data records to be included in each batch. |
convert_flags |
StringsAndDoubles LogicalValues |
Option to convert flag fields. |
convert_missing |
flag | Option to convert missing values to the R NA value. |
convert_datetime |
flag | Option to convert variables with date or datetime formats to R date/time formats. |
convert_datetime_class |
POSIXct POSIXlt |
Options to specify to what format variables with date or datetime formats are converted. |