Source code for punchpipe.flows.tests.test_replay

import os
from datetime import datetime

import pandas as pd
import pytest

from punchpipe.cli import clean_replay

TEST_DIR = os.path.dirname(__file__)
CONFIG_PATH = os.path.join(TEST_DIR, "punchpipe_config.yaml")

[docs] def test_clean_replay(): input_file = os.path.join(TEST_DIR, "data/test_replay.csv") result = clean_replay(input_file, CONFIG_PATH, write=False, reference_date=datetime(2025,6,6)) assert isinstance(result, pd.DataFrame) assert len(result) == 34
[docs] def test_clean_replay_notime(): input_file = os.path.join(TEST_DIR, "data/test_replay.csv") result = clean_replay(input_file, CONFIG_PATH, write=False, window_in_days=None, reference_date=datetime(2025,6,6)) assert isinstance(result, pd.DataFrame) assert len(result) == 34
[docs] def test_clean_replay_empty(): input_file = os.path.join(TEST_DIR, "data/test_replay_empty.csv") result = clean_replay(input_file, CONFIG_PATH, write=False, reference_date=datetime(2025,6,6)) assert isinstance(result, pd.DataFrame) assert len(result) == 0
[docs] def test_clean_replay_one_request(): input_file = os.path.join(TEST_DIR, "data/test_replay_one.csv") result = clean_replay(input_file, CONFIG_PATH, write=False, reference_date=datetime(2025,6,6)) assert isinstance(result, pd.DataFrame) assert len(result) == 1
[docs] def test_clean_replay_connected_requests(): input_file = os.path.join(TEST_DIR, "data/test_replay_connected.csv") result = clean_replay(input_file, CONFIG_PATH, write=False, reference_date=datetime(2025,5,29)) assert isinstance(result, pd.DataFrame) assert len(result) == 1