mysql> SELECT BENCHMARK(1000000,AES_ENCRYPT('hello','goodbye'));
ERROR 4998 (HY000): [1900df69ae400000][192.168.200.77:8527][sharding_db]ERR-CODE: [PXC-4998][ERR_NOT_SUPPORT] function BENCHMARK not support yet!
接下来做些啥呢? 看看嵌套函数咋实现的。
这个嵌套函数可以实现
mysql> select GREATEST(max(1),min(2));
+--------------------------+
| GREATEST(max(1), min(2)) |
+--------------------------+
| 2 |
+--------------------------+
1 row in set (0.18 sec)
@Overridepublicvoiddrive(){TaskDescriptordescription=newTaskDescriptor();// Starting from the root's OptimizeGroup task.tasks.push(newOptimizeGroup(requireNonNull(planner.root,"planner.root"),planner.infCost));// Ensure materialized view roots get explored.// Note that implementation rules or enforcement rules are not applied// unless the mv is matched.exploreMaterializationRoots();try{// Iterates until the root is fully optimized.while(!tasks.isEmpty()){Tasktask=tasks.pop();description.log(task);task.perform();}}catch(VolcanoTimeoutExceptionex){LOGGER.warn("Volcano planning times out, cancels the subsequent optimization.");}}
publicclassScalarFunctionExpressionextendsAbstractExpression{privateList<IExpression>args;privateIScalarFunctionfunction;privateExprContextProvidercontextHolder;publicScalarFunctionExpression(){}publicList<IExpression>getArgs(){returnargs;}@OverridepublicObjecteval(Rowrow,ExecutionContextec){if(function!=null){Object[]actualArgs=newObject[args.size()];for(inti=0;i<args.size();i++){actualArgs[i]=args.get(i).eval(row,ec);}// function.setArgs(Arrays.asList(actualArgs));Objectresult=function.compute(actualArgs,ec);DataTypereturnType=function.getReturnType();returnreturnType.convertFrom(result);}else{GeneralUtil.nestedException("invoke function of null");}returnnull;}
publicclassBenchmarkextendsAbstractCallBackScalarFunction{publicBenchmark(List<DataType>operandTypes,DataTyperesultType){super(operandTypes,resultType);}@OverridepublicString[]getFunctionNames(){returnnewString[]{"BENCHMARK"};}@OverridepublicObjectcompute(Object[]args,ExecutionContextec){if(args[1]instanceofScalarFunctionExpression&&args[0]instanceofNumber){for(inti=0;i<((Number)args[0]).intValue();i++){((ScalarFunctionExpression)args[1]).eval(null,ec);}// TODO support args[1] is select}return0;}}
rel#1300:PhysicalProject.DRDS.[].any.[](input=LogicalCorrelate#1298,BENCHMARK(3, ( SELECT id FROM sbtest_sharding_c LIMIT 1 ))=BENCHMARK(?0, $1),variablesSet=[$cor0])
rel#651:PhysicalProject.DRDS.[].any.[](input=LogicalCorrelate#649,BENCHMARK(3, ( SELECT id FROM sbtest_sharding_c LIMIT 1 ))=BENCHMARK(?0, $1),variablesSet=[$cor0])
mysql> SELECT CONCAT(3, (SELECT id from sbtest_sharding_id limit 1));
+———————————————————-+
| CONCAT(3, ( SELECT id FROM sbtest_sharding_id LIMIT 1 )) |
+———————————————————-+
| 3100001 |
+———————————————————-+
1 row in set (8.99 sec)
看执行流程,似乎是 SELECT id from sbtest_sharding_id limit 1 被作为子查询,查询后再进行 concat 计算了。
rel#1069:PhysicalProject.DRDS.[].any.[](input=LogicalCorrelate#1067,CONCAT(4, ( SELECT id FROM sbtest_sharding_id LIMIT 1 ))=CONCAT(?0, $1),variablesSet=[$cor0])
@OverrideChunkdoNextChunk(){if(closed||isFinish){forceClose();returnnull;}if(currentChunk==null){// read next chunk from inputcurrentChunk=left.nextChunk();if(currentChunk==null){isFinish=left.produceIsFinished();blocked=left.produceIsBlocked();returnnull;}applyRowIndex=0;}if(hasConstantValue){for(;applyRowIndex<currentChunk.getPositionCount();applyRowIndex++){// apply of applyRowIndex's row is finished, compute resultfor(inti=0;i<left.getDataTypes().size();i++){currentChunk.getBlock(i).writePositionTo(applyRowIndex,blockBuilders[i]);}if(constantValue==RexDynamicParam.DYNAMIC_SPECIAL_VALUE.EMPTY){blockBuilders[getDataTypes().size()-1].writeObject(null);}else{blockBuilders[getDataTypes().size()-1].writeObject(outColumnType.convertFrom(constantValue));}}currentChunk=null;returnbuildChunkAndReset();}// 这里就是查询逻辑,也就是说修改后,是需要多次执行的if(curSubqueryApply==null){curSubqueryApply=createSubqueryApply(applyRowIndex);curSubqueryApply.prepare();}curSubqueryApply.process();if(curSubqueryApply.isFinished()){curSubqueryApply.close();if((leftConditions==null||leftConditions.size()==0)&&isValueConstant){constantValue=curSubqueryApply.getResultValue();hasConstantValue=true;}// apply of applyRowIndex's row is finished, compute resultfor(inti=0;i<left.getDataTypes().size();i++){currentChunk.getBlock(i).writePositionTo(applyRowIndex,blockBuilders[i]);}if(curSubqueryApply.getResultValue()==RexDynamicParam.DYNAMIC_SPECIAL_VALUE.EMPTY){blockBuilders[getDataTypes().size()-1].writeObject(null);}else{blockBuilders[getDataTypes().size()-1].writeObject(getDataTypes().get(getDataTypes().size()-1).convertFrom(curSubqueryApply.getResultValue()));}curSubqueryApply=null;if(++applyRowIndex==currentChunk.getPositionCount()){applyRowIndex=0;currentChunk=null;returnbuildChunkAndReset();}}else{blocked=curSubqueryApply.isBlocked();}returnnull;}
rel#651:PhysicalProject.DRDS.[].any.[](input=LogicalCorrelate#649,BENCHMARK(3, ( SELECT id FROM sbtest_sharding_c LIMIT 1 ))=BENCHMARK(?0, $1),variablesSet=[$cor0])