Kiedy czas systemowy wystarcza
Esper domyślnie opiera się na zegarze systemowym. Dla aplikacji przetwarzających zdarzenia na bieżąco – strumień transakcji, dane IoT, logi w czasie rzeczywistym – to podejście jest naturalne i w pełni wystarczające. Problem pojawia się wtedy, gdy chcemy przetwarzać dane historyczne lub testować zapytania na danych spreparowanych. Wtedy zegar systemowy przestaje nam pomagać, a zaczyna przeszkadzać.
Jeśli przetwarzamy dane historyczne ale nasze zapytania EPL bazują na #ext_timed lub #ext_timed_batch, to Esper wyznacza granice okien na podstawie znacznika czasowego wbudowanego w samo zdarzenie – i zegar systemowy w ogóle nie wchodzi tu w grę. Możemy spokojnie pisać:
select spolka, avg(kursOtwarcia) from KursAkcji#ext_timed(data.getTime(), 7 days) group by spolka
Esper nie patrzy wtedy na zegarek – patrzy na wartość data w każdym zdarzeniu. To eleganckie i niezawodne rozwiązanie dla okien opartych na atrybucie zdarzenia.
Kiedy czas systemowy zaczyna przeszkadzać
Nie wszystkie konstrukcje EPL mają swój odpowiednik ext_timed. Wzorce (pattern) korzystające z timer:interval, timer:within czy timer:at, a także zwykłe okna #time oraz klauzula output (output every 3 seconds, output every 1 minutes itp.) – wszystkie te elementy opierają się wyłącznie na wewnętrznym zegarze Espera. Nie istnieje dla nich wersja „ext”, która pozwoliłaby podać własny znacznik czasowy.
Wyobraźmy sobie zapytanie wykrywające sytuację, w której po kursie powyżej 100 nie pojawił się żaden kurs powyżej 95 przez więcej niż 3 dni:
@name('alert')
pattern [
every a=KursAkcji(kursOtwarcia > 100)
-> (timer:interval(3 days) and not KursAkcji(kursOtwarcia > 95))
]
timer:interval(3 days) pyta wprost wewnętrzny zegar Espera: „czy minęły 3 dni?”. Gdy uruchomimy to zapytanie na danych historycznych z 2001 roku, a zegar systemowy pokazuje 2025 – wzorzec nigdy nie wyzwoli się poprawnie. Esper siedzi w roku 2025 i czeka na 3 doby liczone od teraz, a nie od znacznika w danych.
Ten sam problem dotyczy okien #time:
select spolka, avg(kursOtwarcia) from KursAkcji#time(3 days) group by spolka
Przy danych historycznych zdarzenia trafiają do okna, ale bardzo długo z niego nie wypadają – bo zegar systemowy idzie do przodu w tempie rzeczywistym, nie w tempie danych.
To samo dotyczy klauzuli output:
select spolka, avg(kursOtwarcia) from KursAkcji#time(7 days) group by spolka output every 1 days
W powyższym przykładzie musielibyśmy czekać cały dzień na pierwszy z wyników.
Dane historyczne i środowisko testowe
Drugi ważny przypadek to testowanie. Nawet jeśli docelowo aplikacja będzie działać w czasie rzeczywistym, podczas tworzenia i weryfikowania zapytań operujemy na zbiorach przygotowanych danych. Chcemy sprawdzić, czy okno 7-dniowe poprawnie wygasa, czy wzorzec czasowy wyzwala się we właściwym momencie – i chcemy to sprawdzić teraz, nie czekając 7 prawdziwych dni.
Rozwiązanie: wyłącz timer i weź czas we własne ręce
Esper oferuje do tego dedykowany mechanizm. Wystarczą dwie rzeczy.
1. Wyłączenie wewnętrznego timera przy konfiguracji:
Configuration config = new Configuration(); config.getRuntime().getThreading().setInternalTimerEnabled(false);
Od tej chwili Esper nie pyta systemu operacyjnego o czas. Jego wewnętrzny zegar stoi w miejscu dopóki sami go nie przesuniemy.
2. Ustawianie czasu przed wysyłką zdarzeń:
Pole data obecne w każdym zdarzeniu niesie informację o tym, w którym momencie logicznym zdarzenie powinno być przetworzone. Przed wysyłką każdego zdarzenia odczytujemy tę wartość i przesuwamy zegar Espera – ale tylko wtedy, gdy data się zmieniła względem poprzedniego zdarzenia (zdarzenia z tej samej sesji giełdowej mają identyczną datę i są dla Espera równoczesne z punktu widzenia czasu):
long previousTime = Long.MIN_VALUE;
for (String json : events) {
String dataStr = Json.parse(json).asObject().getString("data", null);
long eventTime = Timestamp.valueOf(dataStr).getTime();
if (eventTime != previousTime) {
runtime.getEventService().advanceTime(eventTime);
previousTime = eventTime;
}
runtime.getEventService().sendEventJson(json, "KursAkcji");
}
Całe tygodnie danych historycznych możemy „przewinąć” w ułamku sekundy, a timer:interval(3 days) wyzwoli się dokładnie wtedy, gdy wynika to z pola data, a nie z zegara ściennego.
Okna oparte na liczbie zdarzeń (
#length,#length_batch) działają niezależnie od zegara i przetwarzają każde zdarzenie indywidualnie.
Jak to wygląda w praktyce
Poniższy kompletny przykład pokazuje zapytanie z oknem #time(3 days) na danych historycznych z 2001 roku. Bez sterowania czasem zdarzenia trafiałyby do okna, ale nie wypadały z niego przez zegarowe 3 dni (dekady w czasie z naszych danych) – bo zegar systemowy idzie do przodu w tempie rzeczywistym. Dzięki advanceTime Esper żyje w roku 2001 i okno działa dokładnie tak jak powinno.
Configuration config = new Configuration();
config.getRuntime().getThreading().setInternalTimerEnabled(false);
EPCompiler compiler = EPCompilerProvider.getCompiler();
EPCompiled compiled = compiler.compile("""
@public @buseventtype
create json schema KursAkcji(spolka string, kursOtwarcia double, data string);
@name('avg3days')
select spolka, avg(kursOtwarcia) as srednia, count(*) as liczba
from KursAkcji#time(3 days)
group by spolka
output snapshot every 1 events;
""", new CompilerArguments(config));
EPRuntime runtime = EPRuntimeProvider.getRuntime("demo", config);
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);
runtime.getDeploymentService()
.getStatement(deployment.getDeploymentId(), "avg3days")
.addListener((newData, oldData, stmt, rt) -> {
for (EventBean eb : newData) {
System.out.printf("%-8s srednia=%.2f w oknie=%s zdarzen%n",
eb.get("spolka"),
((Number) eb.get("srednia")).doubleValue(),
eb.get("liczba"));
}
});
String[] data = {
// Sesja 2001-09-04
"{\"spolka\":\"Apple\",\"kursOtwarcia\":18.50,\"data\":\"2001-09-04 00:00:00.0\"}",
"{\"spolka\":\"IBM\", \"kursOtwarcia\":100.15,\"data\":\"2001-09-04 00:00:00.0\"}",
// Sesja 2001-09-05 (+1 dzień)
"{\"spolka\":\"Apple\",\"kursOtwarcia\":18.24,\"data\":\"2001-09-05 00:00:00.0\"}",
"{\"spolka\":\"IBM\", \"kursOtwarcia\":101.50,\"data\":\"2001-09-05 00:00:00.0\"}",
// Sesja 2001-09-07 (+3 dni od sesji pierwszej -> sesja 04.09 wypada z okna)
"{\"spolka\":\"Apple\",\"kursOtwarcia\":17.50,\"data\":\"2001-09-07 00:00:00.0\"}",
"{\"spolka\":\"IBM\", \"kursOtwarcia\":97.90, \"data\":\"2001-09-07 00:00:00.0\"}",
};
long previousTime = Long.MIN_VALUE;
for (String json : data) {
String dataStr = Json.parse(json).asObject().getString("data", null);
long eventTime = Timestamp.valueOf(dataStr).getTime();
if (eventTime != previousTime) {
runtime.getEventService().advanceTime(eventTime);
System.out.println("\n>>> " + dataStr);
previousTime = eventTime;
}
runtime.getEventService().sendEventJson(json, "KursAkcji");
}
runtime.destroy();
Oczekiwany wynik pokazuje, że sesja z 4 września wypada z okna dokładnie wtedy, gdy advanceTime przesunie zegar do 7 września – bo minęły właśnie 3 doby liczone w czasie danych, nie w czasie rzeczywistym:
>>> 2001-09-04 00:00:00.0 Apple srednia=18.50 w oknie=1 zdarzen IBM srednia=100.15 w oknie=1 zdarzen >>> 2001-09-05 00:00:00.0 Apple srednia=18.37 w oknie=2 zdarzen IBM srednia=100.83 w oknie=2 zdarzen >>> 2001-09-07 00:00:00.0 Apple srednia=17.87 w oknie=2 zdarzen ← sesja 04.09 wypadła z okna IBM srednia=99.70 w oknie=2 zdarzen
Podsumowanie
Czas systemowy w Esperze jest wygodny i wystarczający dla aplikacji przetwarzających zdarzenia na żywo, gdy okna opierają się na #ext_timed. Gdy jednak wchodzimy w obszar danych historycznych, testowania na danych spreparowanych lub używamy wzorców z timer:interval i timer:within – zegar systemowy staje się przeszkodą. advanceTime sterowany polem data ze zdarzenia to prosty i elegancki sposób, by oddać kontrolę nad czasem w ręce danych.
Tekst powstał przy współpracy z Claude (Anthropic). Treść została poprawiona, zredagowana i zweryfikowana merytorycznie przez autora.