濮阳杆衣贸易有限公司

主頁(yè) > 知識(shí)庫(kù) > MLSQL Stack如何讓流調(diào)試更加簡(jiǎn)單詳解

MLSQL Stack如何讓流調(diào)試更加簡(jiǎn)單詳解

熱門標(biāo)簽:昌德訊外呼系統(tǒng) 中國(guó)地圖標(biāo)注公司 百度地圖標(biāo)注要什么軟件 天津公司外呼系統(tǒng)軟件 400電話申請(qǐng)廠家現(xiàn)貨 福建外呼電銷機(jī)器人加盟 電話機(jī)器人的價(jià)格多少錢一個(gè)月 自己做地圖標(biāo)注需要些什么 徐涇鎮(zhèn)騰訊地圖標(biāo)注

前言

有一位同學(xué)正在調(diào)研MLSQL Stack對(duì)流的支持。然后說(shuō)了流調(diào)試其實(shí)挺困難的。經(jīng)過(guò)實(shí)踐,希望實(shí)現(xiàn)如下三點(diǎn):

  • 能隨時(shí)查看最新固定條數(shù)的Kafka數(shù)據(jù)
  • 調(diào)試結(jié)果(sink)能打印在web控制臺(tái)
  • 流程序能自動(dòng)推測(cè)json schema(現(xiàn)在spark是不行的)

實(shí)現(xiàn)這三個(gè)點(diǎn)之后,我發(fā)現(xiàn)調(diào)試確實(shí)就變得簡(jiǎn)單很多了。

流程

首先我新建了一個(gè)kaf_write.mlsql,里面方便我往Kafka里寫數(shù)據(jù):

set abc='''
{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
''';
load jsonStr.`abc` as table1;

select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where 
kafka.bootstrap.servers="127.0.0.1:9092";

這樣我每次運(yùn)行,數(shù)據(jù)就能寫入到Kafka.

接著,我寫完后,需要看看數(shù)據(jù)是不是真的都寫進(jìn)去了,寫成了什么樣子:

!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;

這句話表示,我要采樣Kafka 10條Kafka數(shù)據(jù),該Kafka的地址為127.0.0.1:9092,主題為wow.運(yùn)行結(jié)果如下:

沒(méi)有什么問(wèn)題。接著我寫一個(gè)非常簡(jiǎn)單的流式程序:

-- the stream name, should be uniq.
set streamName="streamExample";

-- use kafkaTool to infer schema from kafka
!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;


load kafka.`wow` options 
kafka.bootstrap.servers="127.0.0.1:9092"
as newkafkatable1;


select * from newkafkatable1
as table21;


-- print in webConsole instead of terminal console.
save append table21 
as webConsole.`` 
options mode="Append"
and duration="15"
and checkpointLocation="/tmp/s-cpl4";

運(yùn)行結(jié)果如下:

在終端我們也可以看到實(shí)時(shí)效果了。

補(bǔ)充

當(dāng)然,MLSQL Stack 還有對(duì)流還有兩個(gè)特別好地方,第一個(gè)是你可以對(duì)流的事件設(shè)置http協(xié)議的callback,以及對(duì)流的處理結(jié)果再使用批SQL進(jìn)行處理,最后入庫(kù)。參看如下腳本:

-- the stream name, should be uniq.
set streamName="streamExample";


-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';

-- load data as table
load jsonStr.`data` as datasource;

-- convert table as stream source
load mockStream.`datasource` options 
stepSizeRange="0-3"
as newkafkatable1;

-- aggregation 
select cast(value as string) as k from newkafkatable1
as table21;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- output the the result to console.


save append table21 
as custom.`` 
options mode="append"
and duration="15"
and sourceTable="jack"
and code='''
select count(*) as c from jack as newjack;
save append newjack as parquet.`/tmp/jack`; 
'''
and checkpointLocation="/tmp/cpl15";

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)腳本之家的支持。

您可能感興趣的文章:
  • Mysql LONGBLOB 類型存儲(chǔ)二進(jìn)制數(shù)據(jù) (修改+調(diào)試+整理)
  • Mysql LONGTEXT 類型存儲(chǔ)大文件(二進(jìn)制也可以) (修改+調(diào)試+整理)
  • Mysql 插入中文及中文查詢 (修改+調(diào)試)
  • 新手配置 PHP 調(diào)試環(huán)境(IIS+PHP+MYSQL)
  • MySQL UDF調(diào)試方式debugview的相關(guān)方法
  • 分享101個(gè)MySQL調(diào)試與優(yōu)化技巧
  • GDB調(diào)試Mysql實(shí)戰(zhàn)之源碼編譯安裝

標(biāo)簽:駐馬店 昌都 北京 黔西 梅河口 鄂爾多斯 荊門 陜西

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《MLSQL Stack如何讓流調(diào)試更加簡(jiǎn)單詳解》,本文關(guān)鍵詞  MLSQL,Stack,如何,讓,流,調(diào)試,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《MLSQL Stack如何讓流調(diào)試更加簡(jiǎn)單詳解》相關(guān)的同類信息!
  • 本頁(yè)收集關(guān)于MLSQL Stack如何讓流調(diào)試更加簡(jiǎn)單詳解的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章
    静海县| 泽州县| 桂东县| 武川县| 榆树市| 曲周县| 当雄县| 石屏县| 海盐县| 改则县| 都昌县| 高碑店市| 嵊泗县| 平远县| 土默特左旗| 龙胜| 花垣县| 盘锦市| 修武县| 九龙城区| 诏安县| 瓮安县| 会理县| 宣威市| 伊吾县| 墨竹工卡县| 鲜城| 嘉荫县| 红桥区| 乌拉特中旗| 舒城县| 潞城市| 碌曲县| 榆中县| 厦门市| 教育| 全南县| 同仁县| 东源县| 甘洛县| 太保市|