import os
import json
import random
import typing as t
from datetime import UTC, datetime, timedelta
from functools import partial
import numpy as np
from prefect import flow, get_run_logger, task
from prefect.cache_policies import NO_CACHE
from prefect.context import get_run_context
from punchbowl.levelq.f_corona_model import construct_qp_f_corona_model
from punchbowl.levelq.flow import levelq_CNN_core_flow, levelq_CTM_core_flow
from punchbowl.util import average_datetime
from sqlalchemy import and_, func, or_, select, text
from punchpipe import __version__
from punchpipe.control.cache_layer.nfi_l1 import wrap_if_appropriate
from punchpipe.control.db import File, Flow
from punchpipe.control.processor import generic_process_flow_logic
from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe.control.util import get_database_session, group_files_by_time, load_pipeline_configuration
from punchpipe.flows.util import file_name_to_full_path
[docs]
@task(cache_policy=NO_CACHE)
def levelq_CNN_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99):
logger = get_run_logger()
pending_flows = session.query(Flow).filter(Flow.flow_type == "levelq_CNN").filter(Flow.state == "planned").all()
if pending_flows:
logger.info("A pending flow already exists. Skipping scheduling to let the batch grow.")
return []
all_fittable_files = (session.query(File).filter(File.state.in_(("created", "progressed")))
.filter(File.level == "1")
.filter(File.observatory == "4")
.filter(~File.outlier)
.filter(File.file_type == "QR").limit(1000).all())
if len(all_fittable_files) < 1000:
logger.info("Not enough fittable files")
return []
all_ready_files = (session.query(File).filter(File.state == "created")
.filter(File.level == "1")
.filter(File.observatory == "4")
.filter(File.file_type == "QR").order_by(File.date_obs.desc()).limit(1000).all())
logger.info(f"{len(all_ready_files)} ready files")
if len(all_ready_files) == 0:
return []
# We want a batch of lots of files, but we probably don't want them spread too far in time, so let's group these
# files up with a maximum time span, and take just the first group.
grouped_files = group_files_by_time(all_ready_files, max_duration_seconds=60*60*24*15, max_per_group=1000)
grouped_files = grouped_files[0]
# Let's order it oldest-to-newest. They're currently the opposite from the database's sort
grouped_files = grouped_files[::-1]
logger.info("1 group heading out")
return [grouped_files]
[docs]
@task(cache_policy=NO_CACHE)
def levelq_CNN_construct_flow_info(level1_files: list[File], levelq_file: File, pipeline_config: dict, session=None, reference_time=None):
flow_type = "levelq_CNN"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["flows"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"data_list": [level1_file.filename() for level1_file in level1_files],
# This date_obs is only used to find other files to fit the PCA to, if there aren't enough
# to-be-subtracted images in the batch
"date_obs": average_datetime([f.date_obs for f in level1_files]).strftime("%Y-%m-%d %H:%M:%S"),
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level="Q",
creation_time=creation_time,
priority=priority,
call_data=call_data,
)
[docs]
@task
def levelq_CNN_construct_file_info(level1_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]:
return [File(
level="Q",
file_type="CN",
observatory="N",
polarization="C",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=level1_file.date_obs,
state="planned",
outlier=level1_file.outlier,
bad_packets=level1_file.bad_packets,
)
for level1_file in level1_files
]
[docs]
@flow
def levelq_CNN_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None):
generic_scheduler_flow_logic(
levelq_CNN_query_ready_files,
levelq_CNN_construct_file_info,
levelq_CNN_construct_flow_info,
pipeline_config_path,
reference_time=reference_time,
session=session,
children_are_one_to_one=True,
)
[docs]
def levelq_CNN_call_data_processor(call_data: dict, pipeline_config, session) -> dict:
# Prepend the data root to each input file
for key in ['data_list']:
if call_data[key] is not None:
call_data[key] = file_name_to_full_path(call_data[key], pipeline_config['root'])
# How many files we want for the PCA fitting
target_number = 1100
files_to_fit = session.execute(
select(File,
dt := func.abs(func.timestampdiff(text("second"), File.date_obs, call_data['date_obs'])))
.filter(File.state.in_(("created", "progressed")))
.filter(File.level == "1")
.filter(File.file_type == "QR")
.filter(File.observatory == "4")
.filter(~File.outlier)
.filter(dt > 10 * 60)
.order_by(dt.asc()).limit(target_number)).all()
files_to_fit = [os.path.join(f.directory(pipeline_config['root']), f.filename()) for f, _ in files_to_fit]
# Remove files that we're subtracting
files_to_fit = [f for f in files_to_fit if f not in call_data['data_list']]
# Figure out how many of these extra files we need to meet our target number for fitting
n_to_use = target_number - len(call_data['data_list'])
n_to_use = max(0, n_to_use)
files_to_fit = files_to_fit[:n_to_use]
files_to_fit = [wrap_if_appropriate(f) for f in files_to_fit]
call_data['files_to_fit'] = files_to_fit
del call_data['date_obs']
return call_data
[docs]
@flow
def levelq_CNN_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, levelq_CNN_core_flow, pipeline_config_path, session=session,
call_data_processor=levelq_CNN_call_data_processor)
[docs]
@task(cache_policy=NO_CACHE)
def levelq_CTM_query_ready_files(session, pipeline_config: dict, reference_time=None, max_n=9e99):
logger = get_run_logger()
all_ready_files = (session.query(File).filter(File.state == "created")
.filter(or_(
and_(File.level == "1", File.file_type == "QR", File.observatory.in_(['1', '2', '3'])),
# TODO: We're excluding NFI for now
# and_(File.level == "Q", File.file_type == "CN"),
)).order_by(File.date_obs.desc()).all())
logger.info(f"{len(all_ready_files)} ready files")
if len(all_ready_files) == 0:
return []
grouped_files = group_files_by_time(all_ready_files, max_duration_seconds=10)
logger.info(f"{len(grouped_files)} unique times")
grouped_ready_files = []
cutoff_time = pipeline_config["flows"]["levelq_CTM"].get("ignore_missing_after_days", None)
if cutoff_time is not None:
cutoff_time = datetime.now(tz=UTC) - timedelta(days=cutoff_time)
for group in grouped_files:
if len(grouped_ready_files) >= max_n:
break
# TODO: We're excluding NFI for now
# group_is_complete = len(group) == 4
group_is_complete = len(group) == 3
if group_is_complete:
grouped_ready_files.append(group)
continue
# group[-1] is the newest file by date_obs
if (cutoff_time and group[-1].date_obs.replace(tzinfo=UTC) > cutoff_time):
# We're still potentially waiting for downlinks
continue
# We now have to consider making an incomplete trefoil. We want to look at the L0 files to see if we're still
# waiting on any L1s. This is especially important when reprocessing. To do that, we need to determine a time
# range within which to grab L0s
center = group[0].date_obs
search_width = timedelta(minutes=1)
search_types = ['QR']
# Grab all the L0s that produce inputs for this trefoil
expected_inputs = (session.query(File)
.filter(File.level == "0")
# TODO: This line temporarily excludes NFI
.filter(File.observatory.in_(['1', '2', '3']))
.filter(File.file_type.in_(search_types))
.filter(File.date_obs > center - search_width)
.filter(File.date_obs < center + search_width)
.all())
if len(expected_inputs) == len(group):
# We have the L1s for all the L0s, and we don't expect new L0s, so let's make an incomplete mosaic
grouped_ready_files.append(group)
# Otherwise, we'll pass for now on processing this trefoil
continue
logger.info(f"{len(grouped_ready_files)} groups heading out")
return grouped_ready_files
[docs]
@task(cache_policy=NO_CACHE)
def levelq_CTM_construct_flow_info(level1_files: list[File], levelq_file: File, pipeline_config: dict, session=None, reference_time=None):
flow_type = "levelq_CTM"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["flows"][flow_type]["priority"]["initial"]
alphas_path = pipeline_config["flows"][flow_type].get("alpha_file_path", None)
trim_edges_px = pipeline_config["flows"][flow_type].get("trim_edges_px", 0)
call_data = json.dumps(
{
"data_list": [level1_file.filename() for level1_file in level1_files],
"alphas_file": alphas_path,
"trim_edges_px": trim_edges_px,
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level="Q",
creation_time=creation_time,
priority=priority,
call_data=call_data,
)
[docs]
@task
def levelq_CTM_construct_file_info(level1_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]:
return [File(
level="Q",
file_type="CT",
observatory="M",
polarization="C",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=average_datetime([f.date_obs for f in level1_files]),
state="planned",
outlier=any(file.outlier for file in level1_files),
bad_packets=any(file.bad_packets for file in level1_files),
),
]
[docs]
@flow
def levelq_CTM_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None):
generic_scheduler_flow_logic(
levelq_CTM_query_ready_files,
levelq_CTM_construct_file_info,
levelq_CTM_construct_flow_info,
pipeline_config_path,
reference_time=reference_time,
session=session,
)
[docs]
def levelq_CTM_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict:
call_data['data_list'] = file_name_to_full_path(call_data['data_list'], pipeline_config['root'])
return call_data
[docs]
@flow
def levelq_CTM_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, levelq_CTM_core_flow, pipeline_config_path, session=session,
call_data_processor=levelq_CTM_call_data_processor)
[docs]
@task
def levelq_upload_query_ready_files(session, pipeline_config: dict, reference_time=None):
logger = get_run_logger()
lookback_days = pipeline_config['flows']["levelq_upload"].get("lookback_days", np.inf)
if np.isfinite(lookback_days):
all_ready_files = (session.query(File).filter(File.state == "created")
.filter(File.level == "Q")
.filter(File.date_obs >= datetime.now(UTC) - timedelta(days=lookback_days)).all())
else:
all_ready_files = (session.query(File).filter(File.state == "created")
.filter(File.level == "Q").all())
logger.info(f"{len(all_ready_files)} ready files")
currently_creating_files = session.query(File).filter(File.state == "creating").filter(File.level == "Q").all()
logger.info(f"{len(currently_creating_files)} level Q files currently being processed")
out = [f.file_id for f in all_ready_files]
logger.info(f"Delivering {len(out)} level Q files in this batch.")
return [out]
[docs]
@task
def levelq_upload_construct_flow_info(levelq_files: list[File], intentionally_empty: File, pipeline_config: dict, session=None, reference_time=None):
flow_type = "levelq_upload"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["flows"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"data_list": [levelq_file.filename() for levelq_file in levelq_files],
"bucket_name": pipeline_config["bucket_name"],
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level="Q",
creation_time=creation_time,
priority=priority,
call_data=call_data,
)
[docs]
@task
def levelq_upload_construct_file_info(level1_files: t.List[File], pipeline_config: dict, reference_time=None) -> t.List[File]:
return []
[docs]
@flow
def levelq_upload_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None):
generic_scheduler_flow_logic(
levelq_upload_query_ready_files,
levelq_upload_construct_file_info,
levelq_upload_construct_flow_info,
pipeline_config_path,
reference_time=reference_time,
session=session,
)
[docs]
@flow
def levelq_upload_core_flow(data_list, bucket_name, aws_profile="noaa-prod"):
data_list += [fn + '.sha' for fn in data_list]
manifest_path = write_manifest(data_list)
os.system(f"aws --profile {aws_profile} s3 cp {manifest_path} {bucket_name}")
for file_name in data_list:
os.system(f"aws --profile {aws_profile} s3 cp {file_name} {bucket_name}")
[docs]
def write_manifest(file_names):
now = datetime.now(UTC)
stamp = now.strftime("%Y%m%d%H%M%S")
manifest_name = os.path.join('/mnt/archive/soc/data/noaa_manifests', f"PUNCH_LQ_manifest_{stamp}.txt")
with open(manifest_name, "w") as f:
f.write("\n".join([os.path.basename(fn) for fn in file_names]))
return manifest_name
[docs]
@flow
def levelq_upload_process_flow(flow_id, pipeline_config_path=None, session=None):
logger = get_run_logger()
if session is None:
session = get_database_session()
pipeline_config = load_pipeline_configuration(pipeline_config_path)
# fetch the appropriate flow db entry
flow_db_entry = session.query(Flow).where(Flow.flow_id == flow_id).one()
logger.info(f"Running on flow db entry with id={flow_db_entry.flow_id}.")
# update the processing flow name with the flow run name from Prefect
flow_run_context = get_run_context()
flow_db_entry.flow_run_name = flow_run_context.flow_run.name
flow_db_entry.flow_run_id = flow_run_context.flow_run.id
flow_db_entry.state = "running"
flow_db_entry.start_time = datetime.now(UTC)
session.commit()
# load the call data and launch the core flow
flow_call_data = json.loads(flow_db_entry.call_data)
logger.info(f"Running with {flow_call_data}")
flow_call_data['data_list'] = file_name_to_full_path(flow_call_data['data_list'], pipeline_config['root'])
try:
levelq_upload_core_flow(**flow_call_data)
except Exception as e:
flow_db_entry.state = "failed"
flow_db_entry.end_time = datetime.now(UTC)
logger.info("Something's gone wrong - level0_core_flow failed")
session.commit()
raise e
else:
flow_db_entry.state = "completed"
flow_db_entry.end_time = datetime.now(UTC)
# Note: the file_db_entry gets updated above in the writing step because it could be created or blank
session.commit()
[docs]
@task(cache_policy=NO_CACHE)
def levelq_CFM_query_ready_files(session, pipeline_config: dict, reference_time: datetime):
logger = get_run_logger()
min_files_per_half = pipeline_config['flows']['construct_f_corona_background']['min_files_per_half']
max_files_per_half = pipeline_config['flows']['construct_f_corona_background']['max_files_per_half']
max_hours_per_half = pipeline_config['flows']['construct_f_corona_background']['max_hours_per_half']
before = reference_time - timedelta(hours=max_hours_per_half)
after = reference_time + timedelta(weeks=0)
all_ready_files = (session.query(File)
.filter(File.state.in_(["created", "progressed"]))
.filter(File.date_obs >= before)
.filter(File.date_obs <= after)
.filter(File.level == "Q")
.filter(File.file_type == "CT")
.filter(File.observatory == "M")
.limit(2 * max_files_per_half).all())
if len(all_ready_files) > 2 * min_files_per_half: # need at least 30 images
logger.info(f"{len(all_ready_files)} Level Q CTM files will be used for F corona background modeling.")
return all_ready_files
else:
return []
[docs]
@task(cache_policy=NO_CACHE)
def construct_levelq_CFM_flow_info(levelq_CTM_files: list[File],
levelq_CFM_model_file: File,
pipeline_config: dict,
reference_time: datetime,
session=None
):
flow_type = "levelq_CFM"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["flows"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"filenames": [ctm_file.filename() for ctm_file in levelq_CTM_files],
"reference_time": str(reference_time)
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level="Q",
creation_time=creation_time,
priority=priority,
call_data=call_data,
)
[docs]
@task
def construct_levelq_CFM_file_info(levelq_files: t.List[File], pipeline_config: dict,
reference_time: datetime) -> t.List[File]:
return [File(
level="Q",
file_type="CF",
observatory="M",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs= reference_time,
state="planned",
),]
[docs]
@flow
def levelq_CFM_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None):
session = get_database_session()
pipeline_config = load_pipeline_configuration(pipeline_config_path)
logger = get_run_logger()
if not pipeline_config["flows"]['levelq_CFM'].get("enabled", True):
logger.info("Flow 'levelq_CFM' is not enabled---halting scheduler")
return 0
max_flows = 2 * pipeline_config['flows']['levelq_CFM'].get('concurrency_limit', 1000)
existing_flows = (session.query(Flow)
.where(Flow.flow_type == 'levelq_CFM')
.where(Flow.state.in_(["planned", "launched", "running"])).count())
flows_to_schedule = max_flows - existing_flows
if flows_to_schedule <= 0:
logger.info("Our maximum flow count has been reached; halting")
return
else:
logger.info(f"Will schedule up to {flows_to_schedule} flows")
existing_models = (session.query(File)
.filter(File.level == "Q")
.filter(File.file_type == 'CF')
.all())
logger.info(f"There are {len(existing_models)} model records in the DB")
existing_models = {(model.file_type, model.observatory, model.date_obs): model for model in existing_models}
t0 = datetime.strptime(pipeline_config['flows']['levelq_CFM']['t0'], "%Y-%m-%d %H:%M:%S")
increment = timedelta(hours=float(pipeline_config['flows']['levelq_CFM']['model_spacing_hours']))
n = 0
models_to_try_creating = []
# I'm sure there's a better way to do this, but let's step forward by increments to the present, and then we'll work
# backwards back to t0, so that we prioritize the stray light models that QuickPUNCH uses
while t0 + n * increment < datetime.now():
n += 1
for i in range(n, -1, -1):
t = t0 + i * increment
model_type = "CF"
observatory = "M"
key = (model_type, observatory, t)
model = existing_models.get(key, None)
if model is None:
new_model = File(state='waiting',
level='Q',
file_type=model_type,
observatory=observatory,
polarization=model_type[0],
date_obs=t,
date_created=datetime.now(),
file_version=pipeline_config["file_version"],
software_version=__version__)
session.add(new_model)
models_to_try_creating.append(new_model)
elif model.state == 'waiting':
models_to_try_creating.append(model)
session.commit()
logger.info(f"There are {len(models_to_try_creating)} waiting models")
to_schedule = []
for model in models_to_try_creating:
ready_files = levelq_CFM_query_ready_files(
session, pipeline_config, model.date_obs)
if ready_files:
to_schedule.append((model, ready_files))
logger.info(f"Will schedule {model.file_type} at {model.date_obs}")
if len(to_schedule) == flows_to_schedule:
break
if len(to_schedule):
for model, input_files in to_schedule:
dateobs = model.date_obs
# Clear the placeholder model entry---it'll be regenerated in the scheduling flow
session.delete(model)
generic_scheduler_flow_logic(
lambda *args, **kwargs: [input_files],
construct_levelq_CFM_file_info,
construct_levelq_CFM_flow_info,
pipeline_config,
update_input_file_state=False,
session=session,
cap_planned_flows=False,
reference_time=dateobs,
)
logger.info(f"Scheduled {len(to_schedule)} models")
session.commit()
[docs]
def levelq_CFM_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict:
call_data['filenames'] = file_name_to_full_path(call_data['filenames'], pipeline_config['root'])
return call_data
[docs]
@flow
def levelq_CFM_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, construct_qp_f_corona_model,
pipeline_config_path, session=session,
call_data_processor=levelq_CFM_call_data_processor)
[docs]
@task
def levelq_CFN_query_ready_files(session, pipeline_config: dict, reference_time: datetime, use_n: int = 50):
before = reference_time - timedelta(weeks=4)
after = reference_time + timedelta(weeks=0)
logger = get_run_logger()
all_ready_files = (session.query(File)
.filter(File.state.in_(["created", "progressed"]))
.filter(File.date_obs >= before)
.filter(File.date_obs <= after)
.filter(File.level == "Q")
.filter(File.file_type == "CN")
.filter(File.observatory == "N").all())
logger.info(f"{len(all_ready_files)} Level Q CNN files will be used for F corona background modeling.")
if len(all_ready_files) > 30: # need at least 30 images
random.shuffle(all_ready_files)
return [[f.file_id for f in all_ready_files[:use_n]]]
else:
return []
[docs]
@task
def construct_levelq_CFN_flow_info(levelq_CNN_files: list[File],
levelq_CFN_model_file: File,
pipeline_config: dict,
reference_time: datetime,
session=None
):
flow_type = "levelQ_CFN"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["flows"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"filenames": [cnn_file.filename() for cnn_file in levelq_CNN_files],
"reference_time": str(reference_time)
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level="Q",
creation_time=creation_time,
priority=priority,
call_data=call_data,
)
[docs]
@task
def construct_levelq_CFN_background_file_info(levelq_files: t.List[File], pipeline_config: dict,
reference_time: datetime) -> t.List[File]:
return [File(
level="Q",
file_type="CF",
observatory="N",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs= reference_time,
state="planned",
),]
[docs]
@flow
def levelq_CFN_scheduler_flow(pipeline_config_path=None, session=None, reference_time=None):
reference_time = reference_time or datetime.now(UTC)
generic_scheduler_flow_logic(
levelq_CFN_query_ready_files,
construct_levelq_CFN_background_file_info,
construct_levelq_CFN_flow_info,
pipeline_config_path,
update_input_file_state=False,
reference_time=reference_time,
session=session,
)
[docs]
def levelq_CFN_call_data_processor(call_data: dict, pipeline_config, session=None) -> dict:
call_data['filenames'] = file_name_to_full_path(call_data['filenames'], pipeline_config['root'])
return call_data
[docs]
@flow
def levelq_CFN_process_flow(flow_id: int | list[int], pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, partial(construct_qp_f_corona_model, product_code="CFN"),
pipeline_config_path, session=session,
call_data_processor=levelq_CFN_call_data_processor)