Menu Zamknij

Czas w Esperze: kiedy warto wziąć go we własne ręce

Kiedy czas systemowy wystarcza

Esper domyślnie opiera się na zegarze systemowym. Dla aplikacji przetwarzających zdarzenia na bieżąco – strumień transakcji, dane IoT, logi w czasie rzeczywistym – to podejście jest naturalne i w pełni wystarczające. Problem pojawia się wtedy, gdy chcemy przetwarzać dane historyczne lub testować zapytania na danych spreparowanych. Wtedy zegar systemowy przestaje nam pomagać, a zaczyna przeszkadzać.

Jeśli przetwarzamy dane historyczne ale nasze zapytania EPL bazują na #ext_timed lub #ext_timed_batch, to Esper wyznacza granice okien na podstawie znacznika czasowego wbudowanego w samo zdarzenie – i zegar systemowy w ogóle nie wchodzi tu w grę. Możemy spokojnie pisać:

select spolka, avg(kursOtwarcia)
from KursAkcji#ext_timed(data.getTime(), 7 days)
group by spolka

Esper nie patrzy wtedy na zegarek – patrzy na wartość data w każdym zdarzeniu. To eleganckie i niezawodne rozwiązanie dla okien opartych na atrybucie zdarzenia.

Kiedy czas systemowy zaczyna przeszkadzać

Nie wszystkie konstrukcje EPL mają swój odpowiednik ext_timed. Wzorce (pattern) korzystające z timer:interval, timer:within czy timer:at, a także zwykłe okna #time oraz klauzula output (output every 3 seconds, output every 1 minutes itp.) – wszystkie te elementy opierają się wyłącznie na wewnętrznym zegarze Espera. Nie istnieje dla nich wersja „ext”, która pozwoliłaby podać własny znacznik czasowy.

Wyobraźmy sobie zapytanie wykrywające sytuację, w której po kursie powyżej 100 nie pojawił się żaden kurs powyżej 95 przez więcej niż 3 dni:

@name('alert')
pattern [
    every a=KursAkcji(kursOtwarcia > 100)
    -> (timer:interval(3 days) and not KursAkcji(kursOtwarcia > 95))
]

timer:interval(3 days) pyta wprost wewnętrzny zegar Espera: „czy minęły 3 dni?”. Gdy uruchomimy to zapytanie na danych historycznych z 2001 roku, a zegar systemowy pokazuje 2025 – wzorzec nigdy nie wyzwoli się poprawnie. Esper siedzi w roku 2025 i czeka na 3 doby liczone od teraz, a nie od znacznika w danych.

Ten sam problem dotyczy okien #time:

select spolka, avg(kursOtwarcia)
from KursAkcji#time(3 days)
group by spolka

Przy danych historycznych zdarzenia trafiają do okna, ale bardzo długo z niego nie wypadają – bo zegar systemowy idzie do przodu w tempie rzeczywistym, nie w tempie danych.

To samo dotyczy klauzuli output:

select spolka, avg(kursOtwarcia)
from KursAkcji#time(7 days)
group by spolka
output every 1 days

W powyższym przykładzie musielibyśmy czekać cały dzień na pierwszy z wyników.

Dane historyczne i środowisko testowe

Drugi ważny przypadek to testowanie. Nawet jeśli docelowo aplikacja będzie działać w czasie rzeczywistym, podczas tworzenia i weryfikowania zapytań operujemy na zbiorach przygotowanych danych. Chcemy sprawdzić, czy okno 7-dniowe poprawnie wygasa, czy wzorzec czasowy wyzwala się we właściwym momencie – i chcemy to sprawdzić teraz, nie czekając 7 prawdziwych dni.

Rozwiązanie: wyłącz timer i weź czas we własne ręce

Esper oferuje do tego dedykowany mechanizm. Wystarczą dwie rzeczy.

1. Wyłączenie wewnętrznego timera przy konfiguracji:

Configuration config = new Configuration();
config.getRuntime().getThreading().setInternalTimerEnabled(false);

Od tej chwili Esper nie pyta systemu operacyjnego o czas. Jego wewnętrzny zegar stoi w miejscu dopóki sami go nie przesuniemy.

2. Ustawianie czasu przed wysyłką zdarzeń:

Pole data obecne w każdym zdarzeniu niesie informację o tym, w którym momencie logicznym zdarzenie powinno być przetworzone. Przed wysyłką każdego zdarzenia odczytujemy tę wartość i przesuwamy zegar Espera – ale tylko wtedy, gdy data się zmieniła względem poprzedniego zdarzenia (zdarzenia z tej samej sesji giełdowej mają identyczną datę i są dla Espera równoczesne z punktu widzenia czasu):

long previousTime = Long.MIN_VALUE;
for (String json : events) {
    String dataStr = Json.parse(json).asObject().getString("data", null);
    long eventTime = Timestamp.valueOf(dataStr).getTime();
    if (eventTime != previousTime) {
        runtime.getEventService().advanceTime(eventTime);
        previousTime = eventTime;
    }
    runtime.getEventService().sendEventJson(json, "KursAkcji");
}

Całe tygodnie danych historycznych możemy „przewinąć” w ułamku sekundy, a timer:interval(3 days) wyzwoli się dokładnie wtedy, gdy wynika to z pola data, a nie z zegara ściennego.

Okna oparte na liczbie zdarzeń (#length, #length_batch) działają niezależnie od zegara i przetwarzają każde zdarzenie indywidualnie.

Jak to wygląda w praktyce

Poniższy kompletny przykład pokazuje zapytanie z oknem #time(3 days) na danych historycznych z 2001 roku. Bez sterowania czasem zdarzenia trafiałyby do okna, ale nie wypadały z niego przez zegarowe 3 dni (dekady w czasie z naszych danych) – bo zegar systemowy idzie do przodu w tempie rzeczywistym. Dzięki advanceTime Esper żyje w roku 2001 i okno działa dokładnie tak jak powinno.

Configuration config = new Configuration();
config.getRuntime().getThreading().setInternalTimerEnabled(false);

EPCompiler compiler = EPCompilerProvider.getCompiler();
EPCompiled compiled = compiler.compile("""
        @public @buseventtype
        create json schema KursAkcji(spolka string, kursOtwarcia double, data string);

        @name('avg3days')
        select spolka, avg(kursOtwarcia) as srednia, count(*) as liczba
        from KursAkcji#time(3 days)
        group by spolka
        output snapshot every 1 events;
        """, new CompilerArguments(config));

EPRuntime runtime = EPRuntimeProvider.getRuntime("demo", config);
EPDeployment deployment = runtime.getDeploymentService().deploy(compiled);

runtime.getDeploymentService()
        .getStatement(deployment.getDeploymentId(), "avg3days")
        .addListener((newData, oldData, stmt, rt) -> {
            for (EventBean eb : newData) {
                System.out.printf("%-8s  srednia=%.2f  w oknie=%s zdarzen%n",
                        eb.get("spolka"),
                        ((Number) eb.get("srednia")).doubleValue(),
                        eb.get("liczba"));
            }
        });

String[] data = {
    // Sesja 2001-09-04
    "{\"spolka\":\"Apple\",\"kursOtwarcia\":18.50,\"data\":\"2001-09-04 00:00:00.0\"}",
    "{\"spolka\":\"IBM\",  \"kursOtwarcia\":100.15,\"data\":\"2001-09-04 00:00:00.0\"}",
    // Sesja 2001-09-05  (+1 dzień)
    "{\"spolka\":\"Apple\",\"kursOtwarcia\":18.24,\"data\":\"2001-09-05 00:00:00.0\"}",
    "{\"spolka\":\"IBM\",  \"kursOtwarcia\":101.50,\"data\":\"2001-09-05 00:00:00.0\"}",
    // Sesja 2001-09-07  (+3 dni od sesji pierwszej -> sesja 04.09 wypada z okna)
    "{\"spolka\":\"Apple\",\"kursOtwarcia\":17.50,\"data\":\"2001-09-07 00:00:00.0\"}",
    "{\"spolka\":\"IBM\",  \"kursOtwarcia\":97.90, \"data\":\"2001-09-07 00:00:00.0\"}",
};

long previousTime = Long.MIN_VALUE;
for (String json : data) {
    String dataStr = Json.parse(json).asObject().getString("data", null);
    long eventTime = Timestamp.valueOf(dataStr).getTime();
    if (eventTime != previousTime) {
        runtime.getEventService().advanceTime(eventTime);
        System.out.println("\n>>> " + dataStr);
        previousTime = eventTime;
    }
    runtime.getEventService().sendEventJson(json, "KursAkcji");
}

runtime.destroy();

Oczekiwany wynik pokazuje, że sesja z 4 września wypada z okna dokładnie wtedy, gdy advanceTime przesunie zegar do 7 września – bo minęły właśnie 3 doby liczone w czasie danych, nie w czasie rzeczywistym:

>>> 2001-09-04 00:00:00.0
Apple     srednia=18.50  w oknie=1 zdarzen
IBM       srednia=100.15 w oknie=1 zdarzen

>>> 2001-09-05 00:00:00.0
Apple     srednia=18.37  w oknie=2 zdarzen
IBM       srednia=100.83 w oknie=2 zdarzen

>>> 2001-09-07 00:00:00.0
Apple     srednia=17.87  w oknie=2 zdarzen   ← sesja 04.09 wypadła z okna
IBM       srednia=99.70  w oknie=2 zdarzen

Podsumowanie

Czas systemowy w Esperze jest wygodny i wystarczający dla aplikacji przetwarzających zdarzenia na żywo, gdy okna opierają się na #ext_timed. Gdy jednak wchodzimy w obszar danych historycznych, testowania na danych spreparowanych lub używamy wzorców z timer:interval i timer:within – zegar systemowy staje się przeszkodą. advanceTime sterowany polem data ze zdarzenia to prosty i elegancki sposób, by oddać kontrolę nad czasem w ręce danych.

Tekst powstał przy współpracy z Claude (Anthropic). Treść została poprawiona, zredagowana i zweryfikowana merytorycznie przez autora.