Source code for punchpipe.flows.stray_light
import json
import typing as t
from datetime import datetime, timedelta
from collections import defaultdict
from prefect import flow, get_run_logger
from punchbowl.level1.stray_light import estimate_polarized_stray_light, estimate_stray_light
from sqlalchemy import func
from punchpipe import __version__
from punchpipe.control.db import File, Flow
from punchpipe.control.processor import generic_process_flow_logic
from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe.control.util import get_database_session, load_pipeline_configuration
from punchpipe.flows.level2 import group_l2_inputs_single_observatory
from punchpipe.flows.util import file_name_to_full_path
[docs]
def construct_clear_stray_light_check_for_inputs(session,
pipeline_config: dict,
reference_time: datetime,
reference_file: File):
logger = get_run_logger()
min_files_per_half = pipeline_config['flows']['construct_stray_light']['min_files_per_half']
max_files_per_half = pipeline_config['flows']['construct_stray_light']['max_files_per_half']
max_hours_per_half = pipeline_config['flows']['construct_stray_light']['max_hours_per_half']
t_start = reference_time - timedelta(hours=max_hours_per_half)
t_end = reference_time + timedelta(hours=max_hours_per_half)
L0_impossible_after_days = pipeline_config['flows']['construct_stray_light']['new_L0_impossible_after_days']
more_L0_impossible = datetime.now() - t_end > timedelta(days=L0_impossible_after_days)
file_type_mapping = {"SR": "XR", "SM": "YM", "SZ": "YZ", "SP": "YP"}
target_file_type = file_type_mapping[reference_file.file_type]
L0_type_mapping = {"SR": "CR", "SM": "PM", "SZ": "PZ", "SP": "PP"}
L0_target_file_type = L0_type_mapping[reference_file.file_type]
base_query = (session.query(File)
.filter(File.state.in_(["created", "progressed"]))
.filter(File.observatory == reference_file.observatory)
.filter(~File.outlier)
)
first_half_inputs = (base_query
.filter(File.date_obs >= t_start)
.filter(File.date_obs <= reference_time)
.filter(File.file_type == target_file_type)
.filter(File.level == "1")
.order_by(File.date_obs.desc())
.limit(max_files_per_half).all())
second_half_inputs = (base_query
.filter(File.date_obs >= reference_time)
.filter(File.date_obs <= t_end)
.filter(File.file_type == target_file_type)
.filter(File.level == "1")
.order_by(File.date_obs.asc())
.limit(max_files_per_half).all())
first_half_L0s = (base_query
.filter(File.date_obs >= t_start)
.filter(File.date_obs <= reference_time)
.filter(File.file_type == L0_target_file_type)
.filter(File.level == "0")
.order_by(File.date_obs.desc())
.limit(max_files_per_half).all())
second_half_L0s = (base_query
.filter(File.date_obs >= reference_time)
.filter(File.date_obs <= t_end)
.filter(File.file_type == L0_target_file_type)
.filter(File.level == "0")
.order_by(File.date_obs.asc())
.limit(max_files_per_half).all())
# Allow 5% of the L0s to not be processed, in case a few fail
all_inputs_ready = (len(first_half_inputs) >= 0.95 * len(first_half_L0s)
and len(second_half_inputs) >= 0.95 * len(second_half_L0s))
enough_L1s = len(first_half_inputs) > min_files_per_half and len(second_half_inputs) > min_files_per_half
max_L1s = len(first_half_inputs) == max_files_per_half and len(second_half_inputs) == max_files_per_half
produce = False
if more_L0_impossible:
if len(first_half_L0s) < min_files_per_half or len(second_half_L0s) < min_files_per_half:
reference_file.state = "impossible"
# Record who deemed this to be impossible
reference_file.file_version = pipeline_config["file_version"]
reference_file.software_version = __version__
reference_file.date_created = datetime.now()
elif all_inputs_ready and enough_L1s:
n = min(len(first_half_inputs), len(second_half_inputs))
first_half_inputs = first_half_inputs[:n]
second_half_inputs = second_half_inputs[:n]
produce = True
elif max_L1s:
produce = True
if produce:
all_ready_files = first_half_inputs + second_half_inputs
logger.info(f"{len(all_ready_files)} Level 1 {target_file_type}{reference_file.observatory} files will be used "
"for stray light estimation.")
return [f.file_id for f in all_ready_files]
return []
[docs]
def construct_polarized_stray_light_check_for_inputs(session,
pipeline_config: dict,
reference_time: datetime,
reference_files: list[File]):
logger = get_run_logger()
min_files_per_half = pipeline_config['flows']['construct_stray_light']['min_files_per_half']
max_files_per_half = pipeline_config['flows']['construct_stray_light']['max_files_per_half']
max_hours_per_half = pipeline_config['flows']['construct_stray_light']['max_hours_per_half']
t_start = reference_time - timedelta(hours=max_hours_per_half)
t_end = reference_time + timedelta(hours=max_hours_per_half)
L0_impossible_after_days = pipeline_config['flows']['construct_stray_light']['new_L0_impossible_after_days']
more_L0_impossible = datetime.now() - t_end > timedelta(days=L0_impossible_after_days)
target_file_types = ('YP', 'YM', 'YZ')
L0_target_file_types = ('PP', 'PM', 'PZ')
base_query = (session.query(File)
.filter(File.state.in_(["created", "progressed"]))
.filter(File.observatory == reference_files[0].observatory)
.filter(~File.outlier)
)
first_half_inputs = (base_query
.filter(File.date_obs >= t_start)
.filter(File.date_obs <= reference_time)
.filter(File.file_type.in_(target_file_types))
.filter(File.level == "1")
.order_by(File.date_obs.asc()).all())
second_half_inputs = (base_query
.filter(File.date_obs >= reference_time)
.filter(File.date_obs <= t_end)
.filter(File.file_type.in_(target_file_types))
.filter(File.level == "1")
.order_by(File.date_obs.asc()).all())
first_half_L0s = (base_query
.filter(File.date_obs >= t_start)
.filter(File.date_obs <= reference_time)
.filter(File.file_type.in_(L0_target_file_types))
.filter(File.level == "0")
.order_by(File.date_obs.asc()).all())
second_half_L0s = (base_query
.filter(File.date_obs >= reference_time)
.filter(File.date_obs <= t_end)
.filter(File.file_type.in_(L0_target_file_types))
.filter(File.level == "0")
.order_by(File.date_obs.asc()).all())
order = 'MZP' if reference_files[0].observatory == '4' else 'PZM'
first_half_inputs = group_l2_inputs_single_observatory(first_half_inputs, order, only_complete=True)[::-1]
second_half_inputs = group_l2_inputs_single_observatory(second_half_inputs, order, only_complete=True)
first_half_L0s = group_l2_inputs_single_observatory(first_half_L0s, order, only_complete=True)[::-1]
second_half_L0s = group_l2_inputs_single_observatory(second_half_L0s, order, only_complete=True)
# Allow 5% of the L0s to not be processed, in case a few fail
all_inputs_ready = (len(first_half_inputs) >= 0.95 * len(first_half_L0s)
and len(second_half_inputs) >= 0.95 * len(second_half_L0s))
enough_L1s = len(first_half_inputs) > min_files_per_half and len(second_half_inputs) > min_files_per_half
max_L1s = len(first_half_inputs) >= max_files_per_half and len(second_half_inputs) >= max_files_per_half
produce = False
if more_L0_impossible:
if len(first_half_L0s) < min_files_per_half or len(second_half_L0s) < min_files_per_half:
for reference_file in reference_files:
reference_file.state = "impossible"
# Record who deemed this to be impossible
reference_file.file_version = pipeline_config["file_version"]
reference_file.software_version = __version__
reference_file.date_created = datetime.now()
elif all_inputs_ready and enough_L1s:
n = min(len(first_half_inputs), len(second_half_inputs))
first_half_inputs = first_half_inputs[:n]
second_half_inputs = second_half_inputs[:n]
produce = True
elif max_L1s:
produce = True
if produce:
all_ready_files = []
for group in first_half_inputs[:max_files_per_half]:
all_ready_files.extend(group)
for group in second_half_inputs[:max_files_per_half]:
all_ready_files.extend(group)
logger.info(f"{len(all_ready_files)} Level 1 Y*{reference_files[0].observatory} files will be used "
"for stray light estimation.")
return [f.file_id for f in all_ready_files]
return []
[docs]
def construct_stray_light_flow_info(level1_files: list[File],
level1_stray_light_files: File,
pipeline_config: dict,
reference_time: datetime,
file_type: str,
spacecraft: str,
session=None):
flow_type = "construct_stray_light"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["flows"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"filepaths": [level1_file.filename() for level1_file in level1_files],
"reference_time": reference_time.strftime("%Y-%m-%d %H:%M:%S"),
"spacecraft": spacecraft,
"is_polarized": file_type != ['SR'],
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level="1",
creation_time=creation_time,
priority=priority,
call_data=call_data,
)
[docs]
def construct_stray_light_file_info(level1_files: t.List[File],
pipeline_config: dict,
reference_time: datetime,
file_type: str,
spacecraft: str) -> t.List[File]:
date_obses = [f.date_obs for f in level1_files]
date_beg, date_end = min(date_obses), max(date_obses)
if file_type == ['SR']:
return [File(
level="1",
file_type=file_type,
observatory=spacecraft,
polarization=level1_files[0].polarization,
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=reference_time,
date_beg=date_beg,
date_end=date_end,
state="planned",
),]
else:
return [File(
level="1",
file_type="SM",
observatory=spacecraft,
polarization="M",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=reference_time,
date_beg=date_beg,
date_end=date_end,
state="planned",
),
File(
level="1",
file_type="SZ",
observatory=spacecraft,
polarization="Z",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=reference_time,
date_beg=date_beg,
date_end=date_end,
state="planned",
),
File(
level="1",
file_type="SP",
observatory=spacecraft,
polarization="P",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=reference_time,
date_beg=date_beg,
date_end=date_end,
state="planned",
)]
[docs]
@flow
def construct_stray_light_scheduler_flow(pipeline_config_path=None, session=None, reference_time: datetime | None = None):
session = get_database_session()
pipeline_config = load_pipeline_configuration(pipeline_config_path)
logger = get_run_logger()
if not pipeline_config["flows"]['construct_stray_light'].get("enabled", True):
logger.info("Flow 'construct_stray_light' is not enabled---halting scheduler")
return
max_flows = pipeline_config['flows']['construct_stray_light'].get('concurrency_limit', 1000)
existing_flows = (session.query(Flow)
.where(Flow.flow_type == 'construct_stray_light')
.where(Flow.state.in_(["planned", "launched", "running"])).count())
flows_to_schedule = max_flows - existing_flows
if flows_to_schedule <= 0:
logger.info("Our maximum flow count has been reached; halting")
return
else:
logger.info(f"Will schedule up to {flows_to_schedule} flows")
existing_models = (session.query(File)
.filter(File.level == "1")
.filter(File.file_type.in_(['SR', 'SM', 'SZ', 'SP']))
.all())
logger.info(f"There are {len(existing_models)} model records in the DB")
existing_models = {(model.file_type, model.observatory, model.date_obs): model for model in existing_models}
t0 = datetime.strptime(pipeline_config['flows']['construct_stray_light']['t0'], "%Y-%m-%d %H:%M:%S")
increment = timedelta(hours=float(pipeline_config['flows']['construct_stray_light']['model_spacing_hours']))
n = 0
# I'm sure there's a better way to do this, but let's step forward by increments to the present, and then we'll work
# backwards back to t0, so that we prioritize the stray light models that QuickPUNCH uses
while t0 + n * increment < datetime.now():
n += 1
for i in range(n, -1, -1):
t = t0 + i * increment
for model_type in ['SR', 'SM', 'SZ', 'SP']:
for observatory in ['1', '2', '3', '4']:
key = (model_type, observatory, t)
model = existing_models.get(key, None)
if model is None:
new_model = File(state='waiting',
level='1',
file_type=model_type,
observatory=observatory,
polarization='C' if model_type[1] == 'R' else model_type[1],
date_obs=t,
date_created=datetime.now(),
file_version=pipeline_config["file_version"],
software_version=__version__)
session.add(new_model)
existing_models[key] = new_model
waiting_models_by_time_and_type = defaultdict(list)
for model in existing_models.values():
if model.state == 'waiting':
is_polarized = model.polarization != 'C'
waiting_models_by_time_and_type[(model.date_obs, model.observatory, is_polarized)].append(model)
logger.info(f"There are {len(waiting_models_by_time_and_type)} waiting models")
dates = (session.query(func.min(File.date_obs), func.max(File.date_obs))
.where(File.file_type.in_(['XR', 'YZ', 'YP', 'YM']))
.where(File.state.in_(['progressed', 'created'])).all())
if dates[0][0] is None:
logger.info("There are no X files in the database")
session.commit()
return
earliest_input, latest_input = dates[0]
target_date = pipeline_config.get('target_date', None)
target_date = datetime.strptime(target_date, "%Y-%m-%d") if target_date else None
if target_date:
sorted_models = sorted(waiting_models_by_time_and_type.items(),
key=lambda x: abs((target_date - x[0][0]).total_seconds()))
else:
sorted_models = sorted(waiting_models_by_time_and_type.items(),
key=lambda x: x[0][0],
reverse=True)
n_skipped = 0
to_schedule = []
for (date_obs, observatory, is_polarized), models in sorted_models:
if not (earliest_input <= date_obs <= latest_input):
n_skipped += 1
continue
if is_polarized:
if len(models) != 3:
logger.warning(f"Wrong number of waiting polarized models for {date_obs}, got {len(models)}---skipping")
continue
ready_files = construct_polarized_stray_light_check_for_inputs(
session, pipeline_config, models[0].date_obs, models)
if ready_files:
to_schedule.append((models, ready_files))
logger.info(f"Will schedule {' '.join(m.file_type + m.observatory for m in models)} "
f"at {models[0].date_obs}")
if len(to_schedule) == flows_to_schedule:
break
else:
if len(models) != 1:
logger.warning(f"Wrong number of waiting clear models for {date_obs}, got {len(models)}---skipping")
continue
model = models[0]
ready_files = construct_clear_stray_light_check_for_inputs(
session, pipeline_config, model.date_obs, model)
if ready_files:
to_schedule.append(([model], ready_files))
logger.info(f"Will schedule {model.file_type}{model.observatory} at {model.date_obs}")
if len(to_schedule) == flows_to_schedule:
break
logger.info(f"{n_skipped} models fall outside the range of existing X files and were not queried")
if len(to_schedule):
for models, input_files in to_schedule:
# Clear the placeholder model entry---it'll be regenerated in the scheduling flow
args_dictionary = {"file_type": [m.file_type for m in models], "spacecraft": models[0].observatory}
dateobs = models[0].date_obs
for model in models:
session.delete(model)
generic_scheduler_flow_logic(
lambda *args, **kwargs: [input_files],
construct_stray_light_file_info,
construct_stray_light_flow_info,
pipeline_config,
update_input_file_state=False,
session=session,
args_dictionary=args_dictionary,
cap_planned_flows=False,
reference_time=dateobs,
)
logger.info(f"Scheduled {len(to_schedule)} models")
session.commit()
[docs]
def construct_stray_light_call_data_processor(call_data: dict, pipeline_config, session) -> dict:
# Prepend the directory path to each input file
call_data['filepaths'] = file_name_to_full_path(call_data['filepaths'], pipeline_config['root'])
if call_data['is_polarized']:
call_data['zfilepaths'] = call_data['filepaths'][1::3]
if call_data['spacecraft'] == '4':
call_data['mfilepaths'] = call_data['filepaths'][::3]
call_data['pfilepaths'] = call_data['filepaths'][2::3]
else:
call_data['pfilepaths'] = call_data['filepaths'][::3]
call_data['mfilepaths'] = call_data['filepaths'][2::3]
del call_data['filepaths']
del call_data['spacecraft']
call_data['num_workers'] = 32
call_data['num_loaders'] = 5
return call_data
[docs]
@flow
def construct_stray_light_process_flow(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, call_correct_stray_light_function, pipeline_config_path, session=session,
call_data_processor=construct_stray_light_call_data_processor)
[docs]
def call_correct_stray_light_function(*args, **kwargs):
is_polarized = kwargs.pop('is_polarized')
if is_polarized:
kwargs.pop('num_workers', None)
return estimate_polarized_stray_light(*args, **kwargs)
return estimate_stray_light(*args, **kwargs)