原標(biāo)題:大數(shù)據(jù)訓(xùn)練:Flink窗口開始時間預(yù)測(數(shù)據(jù)為往年僅供參考)的計算
我還記得我比較好次學(xué)習(xí)flink的時候,嗶哩嗶哩的老師說Flink窗口的開始時間預(yù)測(數(shù)據(jù)為往年僅供參考)和結(jié)束時間預(yù)測(數(shù)據(jù)為往年僅供參考)和你想象的不一樣。當(dāng)時我好像記得老師說flink的窗口大小會根據(jù)你的時間預(yù)測(數(shù)據(jù)為往年僅供參考)單位來修改。
現(xiàn)在很多人還不太了解窗口機(jī)制和水印。更不用說什么時候窗口會結(jié)束。所以,今天從源代碼的角度,我們來普及一下窗口什么時候開始,什么時候結(jié)束。
我們可以寫一個簡單的代碼來看看效果。我習(xí)慣用java寫flink,所以用java。
@覆蓋
公共集合assignWindows(Object元素,長時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳,WindowAssignerContext上下文){
if(時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳> Long。最小值){
list windows = new ArrayList((int)(size/slide));
//獲取窗口開始時間預(yù)測(數(shù)據(jù)為往年僅供參考)
long lastStart = time window . getwindowstartwithcoffset(時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳,偏移量,幻燈片);
for(long start = last start;
開始>時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳-大小;
開始-=滑動){
windows . add(new time window(start,start+size));
}
返回窗口;
} else {
拋出new RuntimeException("記錄有長。MIN_VALUE時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳(=無時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳標(biāo)記)?!?
“時間預(yù)測(數(shù)據(jù)為往年僅供參考)特性是否設(shè)置為‘processing time’,或者您是否忘記調(diào)用“+
" ' datastream . assigntimestampsandwatermarks(...)'?");
}
}
public static long getwindowstartwithcoffset(長時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳、長偏移量、長窗口大小){
//窗口開始計算的時間預(yù)測(數(shù)據(jù)為往年僅供參考)
返回時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳-(時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳偏移量+窗口大小)%窗口大??;
}
我們可以看到,窗口開始時間預(yù)測(數(shù)據(jù)為往年僅供參考)是取模后的時間預(yù)測(數(shù)據(jù)為往年僅供參考)。下面簡單分析一下。
如果我們比較好條數(shù)據(jù)的時間預(yù)測(數(shù)據(jù)為往年僅供參考)戳是1000,偏移量暫時不需要管理,【關(guān)注尚硅谷,輕松學(xué)】因為是時間預(yù)測(數(shù)據(jù)為往年僅供參考)的偏移量,比如東八區(qū)什么的。如果我們的窗口大小是5s,
那么接下來的公式計算就是1000-(1000-0+5000)% 5000,那么我們可以計算的結(jié)果就是0,也就是說窗口開始時間預(yù)測(數(shù)據(jù)為往年僅供參考)是0。更大的時間預(yù)測(數(shù)據(jù)為往年僅供參考)窗口大小,你可以自己算。
也就是說開始時間預(yù)測(數(shù)據(jù)為往年僅供參考)是0,結(jié)束時間預(yù)測(數(shù)據(jù)為往年僅供參考)窗口是4999,因為計算會被5000觸發(fā)。然后,atguigu,我們將驗證它是否與我們的分析一致。
接下來我們寫一個簡單的字?jǐn)?shù)統(tǒng)計,因為在多并行中不太好分析,所以設(shè)置為單并行讀取。
/**
* @作者:青芝.吳
* @日期:往年1月18日晚上8點36分
*/
公共類WindowSizeTest {
公共靜態(tài)void main(String[] args)引發(fā)異常{
StreamExecutionEnvironment env = StreamExecutionEnvironment . getexecutionenvironment();
env . setstreamtimecharacter istic(time character istic。event time);
datastream source source = env . socket text stream(" localhost ",9999);
SingleOutputStreamOperator source 1 = source . assigntimestampsandwatermarks(new boundedoutofordernessestimestampextractor(time . of(0,TimeUnit。毫秒)){
@覆蓋
公共long extractTimestamp(字符串){
返回Long.parseLong(s.split(","[0]);
}
});
SingleOutputStreamOperator map = source 1 . map(新映射函數(shù)>(){
@覆蓋
公共元組2映射(字符串s)引發(fā)異常{
return Tuple2.of(s.split(","[1],1);
}
});
windowed stream window = map . key by(0)。window(tumblingeventimewindows . of(time . of(5000,時間預(yù)測(數(shù)據(jù)為往年僅供參考)單位。毫秒)));
window.sum(1)。print();
env . execute();
}
}
接下來看看我們的分析是否正確。
顯然是正確的。你能計算***的窗口大小嗎?
文章來自一個胖子的大數(shù)據(jù)之路。
推薦閱讀:
用于大數(shù)據(jù)開發(fā)的Flink+TiDB
大數(shù)據(jù)訓(xùn)練:Flink調(diào)度器的性能提升
大數(shù)據(jù)訓(xùn)練:flink使用hive udf的原因分析。回搜狐多看看。
負(fù)責(zé)編輯:
文章標(biāo)題:大數(shù)據(jù)培訓(xùn):Flink窗口的開始時間預(yù)測(數(shù)據(jù)為往年僅供參考)的計算
本文地址:http://balticsea-crewing.com/show-85655.html
本文由合作方發(fā)布,不代表中職學(xué)校招生網(wǎng)_55px.com.cn立場,轉(zhuǎn)載聯(lián)系作者并注明出處:中職學(xué)校招生網(wǎng)_55px.com.cn
免責(zé)聲明:本文僅代表文章作者的個人觀點,與本站無關(guān)。其原創(chuàng)性、真實性以及文中陳述文字和內(nèi)容未經(jīng)本站證實,請讀者僅作參考,并自行核實相關(guān)內(nèi)容。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請聯(lián)系郵箱:dashenkeji8@163.com,我們將在第 一 時 間進(jìn)行核實處理。軟文/友鏈/推廣/廣告合作也可以聯(lián)系我。