# Daily Index Slice Creation

While the crawlers run continuously, the produced WARC files are only processed once a day. This section explains the infrastructure behind the workflows that run daily and turn the WARC files for one day into usable index slices.

The figure below shows an overview of the different components in this infrastructure.

```{image} figures/daily-index-slice-creation.png
```

## Workflow Overview

Each day at 3:00 AM, we start processing the crawl data of the previous day. For each crawl site, we check whether the associated S3 contains new WARC files. If not, we terminate the workflow. Otherwise, we start a batch workflow with several HPC tasks (visible in the figure above). For each HPC batch job, we use the number of WARC files to estimate how many nodes we will need to run the workflow, and for how long.

The workflows are published as [a set of script collections in our GitLab group](https://opencode.it4i.eu/openwebsearcheu-public/workflows/). Each batch job in the daily workflow has a dedicated repository, which contains:
- A `conf` directory in which data center specific information (such as scratch locations, Slurm parameters, etc.) can be defined;
- An `install.sh` script, to install the dependencies necessary to run this workflow;
- An `entrypoint.sh` script, which can be called within an HPC batch job allocation to execute the workflow; and
- An `sbatch_trigger.sh` script, which can be used to allocate the necessary number of nodes through Slurm and then run the entrypoint script.

The workflow repositories and the required software packages are all installed under a project directory on the HPC filesystem. We distinguish between `dev` and `prod` versions of the workflows and software (though this distinction is unnecessary for parties who only wish to run the workflows without developing new versions). As a result, our project space looks something like the following:

```
$PROJECT_DIR/
    workflows/
        dev/
            warc-staging-workflow/
            resilipipe-workflow/
            ...
        prod/
            ...
    software/
        dev/
            resilipipe/
            open-web-indexer/
            ...
        ...
    ...
```

After each HPC task, we store all intermediate data on HPC scratch storage, both to reduce I/O overhead and to ensure we can restart failed components of the workflows without having to re-run the whole pipeline. The data is stored in a directory for this specific daily execution of the workflows, which is cleaned after the whole workflow has finished successfully. This execution-specific directory is referred to as `$WORKFLOW_SCRATCH_DIR`. Its structure is shown below.

```
$SCRATCH/
    workflows/
        dev/
            OpenWebIndex_Main_V2_DATACENTER/
                year=YYYY/
                    month=MM/
                        day=DD/
                            warc_staging/
                            parquet_staging/
                            index_staging/
                            ...
        prod/
            ...
```

In the rest of this section, we describe the individual HPC tasks of the daily workflows.

## HPC Tasks

### WARC Staging [[repo](https://opencode.it4i.eu/openwebsearcheu-public/workflows/warc-staging-workflow)]

To simplify further processing, we first copy all relevant WARC files to HPC scratch storage. The files are stored in `$WORKFLOW_SCRATCH_DIR/warc_staging`.

### Resilipipe [[repo](https://opencode.it4i.eu/openwebsearcheu-public/workflows/resilipipe-workflow)]

[Resilipipe](https://opencode.it4i.eu/openwebsearcheu-public/resilipipe/) is our modular, scalable framework for preprocessing and content analysis of the WARC files. It takes in WARC files and transforms each WARC file into a gzipped Parquet file with the main content, language, relevant [collection indices](dnt:collection-indices), etc. of each crawled page. See Resilipipe's README to see which modules are currently implemented, and to find a full overview of the extracted content per page.

The Parquet files are stored in `$WORKFLOW_SCRATCH_DIR/parquet_staging`. Once a WARC file was fully processed and the corresponding Parquet file was created, the WARC file is deleted to save storage.

### Indexing [[repo](https://opencode.it4i.eu/openwebsearcheu-public/workflows/indexing-workflow)]

The indexing workflow uses our Spark-based [Open Web Indexer](https://opencode.it4i.eu/openwebsearcheu-public/open-web-indexer) to transform the preprocessed Web content into index shards. For each [collection index](dnt:collection-indices) marked by Resilipipe, the indexer creates a dedicated subset, each of which is further split up into language-specific shards. For each shard, we include Parquet files containing the corresponding documents and metadata, as well as an `index.ciff.gz` that contains an (unstopped and unstemmed) inverted file representation of the index shard in the CIFF format.

We deploy Spark within HPC infrastructure using the [`spark-deployment`](https://opencode.it4i.eu/openwebsearcheu-public/spark-deployment) repository, which in turn uses the [Magpie](https://github.com/LLNL/magpie/) script collection.

The index shards are stored in the `$WORKFLOW_SCRATCH_DIR/index_staging` directory, with subdirectories for each collection index:

```
$WORKFLOW_SCRATCH_DIR/
    index_staging/
        collection=main/
            resource=owi/
                language=eng/
                    index.ciff.gz
                    metadata_0.parquet
                    ...
                ...
        collection=curlie_full/
            ...
        ...
    ...
```

For each collection index, we create a new iRODS collection, which can be accessed via the dashboard and [owilix](dnt:owi-access-owilix).

### Resilipipe GPU [[repo](https://opencode.it4i.eu/openwebsearcheu-public/workflows/resilipipe-gpu-workflow)]

The `gpu` collection index contains documents that are deemed of high quality and/or importance, and for which we want to enable further GPU-based processing. This is done using [Resilipipe GPU](https://opencode.it4i.eu/openwebsearcheu-public/resilipipe-gpu), which runs on the GPU partition of an HPC cluster and computes embeddings (and other embedding-based features) for each document.

The documents are read from the `$WORKFLOW_SCRATCH_DIR/index_staging/collection=gpu` directory, and are stored in the `$WORKFLOW_SCRATCH_DIR/embedding_staging` directory. Again, the embeddings are uploaded as iRODS collections, but this time with the `owie` (Open Web Index Embeddings) resource type.

### Remote index [[repo](https://opencode.it4i.eu/openwebsearcheu-public/workflows/remote-index-workflow)]

We also create a ["remote index" representation](dnt:remote-index) of the Open Web Index that can be queried remotely using tools like DuckDB or Spark. Like the regular index shards, the remote index is also created using the [Open Web Indexer](https://opencode.it4i.eu/openwebsearcheu-public/open-web-indexer). The remote index representation is created within Spark, and the final representation is copied to a central S3 instance using [Hadoop DistCp](https://hadoop.apache.org/docs/stable/hadoop-distcp/DistCp.html). See our [documentation on the remote index](dnt:remote-index) for more information on how to access the remote index.

### Cleanup [[repo](https://opencode.it4i.eu/openwebsearcheu-public/workflows/cleanup-workflow)]

As a final step in the daily workflow, we clean up the `$WORKFLOW_SCRATCH_DIR` to remove all intermediate files that are no longer necessary.

## Workflow Orchestration

The daily preprocessing and indexing workflows are managed in [LEXIS](https://portal.lexis.tech). Under the hood, LEXIS runs [Apache Airflow](https://airflow.apache.org/), and LEXIS workflows are represented as Airflow DAGs. Our DAG definitions can be found [on GitLab](https://opencode.it4i.eu/openwebsearcheu-public/workflows/airflow-dags). HPC batch jobs are scheduled on HPC infrastructure using [HEAppE](https://heappe.eu/). An overview of the Airflow DAGs, and how they interact with the HPC centers, can be found below.

```{image} figures/workflow-orchestration.png
```
<br><br>
When a workflow is started for a specific data center and date, we first perform a **resource estimation** step. This counts the number of WARC files for that specific date in the corresponding S3 location, and tries to estimate how many nodes we will need to run each of the batch jobs (e.g. preprocessing or indexing), and how long this will take. If no WARC files are found, the workflow is terminated.

This information is used to configure each of the HPC batch jobs. For each HPC job that is scheduled, the workflow makes a call to the data center's HEAppE endpoint to queue the job (often through Slurm). The workflow then enters a waiting state, checking periodically whether the HPC job has finished.

After the HPC job has finished, we check the job status through HEAppE, and if the job failed we restart the HPC job or fail the entire workflow. If the job was successful, and produced datasets that need to be stored in iRODS, we fetch a file `collections.csv` from the HPC working directory that contains an overview of the created collections. For each collection, LEXIS issues a request for the data to be staged to iRODS through the DDI layer. Again, the workflow waits until it receives confirmation that the data was uploaded to iRODS.

Throughout the duration of the job, a set of [custom LEXIS operators](https://opencode.it4i.eu/lexis-platform/orchestrator/airflow-provider) make sure that active session tokens are refreshed periodically, so that the workflow does not fail due to an inactive session token.

### Workflow Configuration

The workflows are configured using [a JSON file](https://opencode.it4i.eu/openwebsearcheu-public/workflows/airflow-dags/-/blob/main/dags/params_v2.json), which contains the following fields:

- `workflows`: the data center specific workflows that we are currently running (separated into `main` and `test` workflows). Each workflow is defined as:
  ```json
  "workflow_name": {
      "crawl_location": "data_center_x",
      "hpc_location": "data_center_y",
      "storage_location": "data_center_z",
      "test_object_path": "workflow-test"
  }
  ```
  Each of the `*_location` fields refers to an Airflow connection, configured for a specific crawl location (S3), HPC location (HEAppE) or storage location (iRODS). See [the section on connections below](#connections) for more information.

  The `test_object_path` is only required for `test` workflows. It refers to a static path in S3 where a small set of test files reside, which can be used for test or development workflows so we do not have to depend on daily crawl data for these cases.
- `hpc_tasks`: the set of HPC tasks we run for each workflow (described in the previous [Workflows](#workflows) section). Each task is defined as, for example:
  ```json
  "task_name": {
      "workflow_dir": "task-workflow-directory",
      "resources": {
          "warcs_per_node_per_hour": 50,
          "target_walltime": 14400,
          "min_nodes": 2,
          "max_nodes": 10,
          "node_type": "gpu",
          "is_array_job": true
      },
      "max_retries": 5,
      "stage_ddi": true,
      "depends_on": [{
          "task": "previous_task_name",
          "collection_name": "main",
          "resource_type": "owi"
      }]
  }
  ```
  The following parameters are supported:
  - `workflow_dir`: the subdirectory in `$PROJECT_DIR/workflows/{prod,dev}` that describes the workflow (and contains the `entrypoint.sh` script)
  - `resources`: a configuration that is used to perform resource estimation based on the number of WARC files that were crawled on a day, and prepare the Slurm allocation (e.g. queue type, number of nodes, walltime)
  - `max_retries`: how often a HPC task can be retried if it fails (e.g. for Resilipipe jobs that timed out but can pick up where they left off by processing only new WARC files)
  - `stage_ddi`: whether the workflow produces collections that should be pushed to iRODS. If `true`, we read a `collections.csv` file from the HPC job's working directory and dynamically stage the found collections using the LEXIS DDI layer.
  - `depends_on`: which task needs to run before this task can start. For instance, `indexing` depends on `resilipipe`. If a `collection_name` and `resource_type` are provided, we fetch the iRODS collection identifier for the corresponding collection and inject it into HPC environment variables and iRODS metadata (to support data lineage).

  The `hpc_tasks` configuration is overriden in dedicated `params_v2_dev.json` and `params_v2_test.json` files, e.g. to use HPC test queues or reduce resource requirements, in order to simplify and speed up test workflows.
- `heappe_job_config`: the actual configuration that will be used to schedule HPC jobs through HEAppE. This contains the computed resource requirements and other Slurm parameters, as well as environment variables with S3 credentials and task-specific values (such as the `$WORKFLOW_SCRATCH_DIR` we saw earlier).
- `ddi_source`: the configuration that describes the HPC source filesystem, which is used when copying files to iRODS using the DDI layer.
- `ddi_target`: the configuration that describes the target iRODS filesystem when copying files using the DDI layer, as well as the Datacite metadata that should be attached to the created dataset.

### Connections

Data center connections are configured and managed through [Airflow Connections](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html). Our workflows can be configured dynamically, choosing different data centers for crawl storage, HPC processing and iRODS storage. This allows us to, for example, even run workflows for sites that only perform crawling, by choosing a different data center for the HPC and iRODS parts (though it is most efficient to keep all processing local to a single data center to reduce data transfer).

#### Crawl storage (S3)

Crawl storage refers to an S3 endpoint and is defined as an "Amazon Web Services" connection (requires the [AWS Airflow provider](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/index.html)). The following fields are required:
- Connection ID: a unique identifier for this S3 endpoint. Should be prefixed with `s3_`, e.g.: `s3_data_center_x`
- AWS Access Key ID: the S3 access key
- AWS Secret Access Key: the S3 secret

Additionally, the following configuration is filled in the "Extra" information field:
```json
{
  "endpoint_url": "https://s3.example.com",
  "region_name": "us-east-1",
  "service_config": {
    "s3": {
      "bucket_name": "s3-bucket-name"
    }
  }
}
```

#### HPC center (HEAppE)

The HPC center configuration mostly describes how HEAppE is configured for that specific data center. We use the "HTTP" connection type. The following parameters are required:
- Connection ID: a unique identifier for this HPC center. Should be prefixed with `hpc_`, e.g.: `hpc_data_center_y`
- Login: the Slurm account number under which HPC jobs should be submitted

The following HEAppE configuration is specified in the "Extra" field:
```json
{
  "system_name": "Cluster Name",
  "heappe_uri": "https://heappe.example.com",
  "cluster_id": 1,
  "workflow_dir": "/path/to/project/storage",
  "scratch_dir": "/path/to/scratch",
  "node_type_ids": {
    "cpu": 1,
    "cpu_test": 2,
    "gpu": 3,
    "gpu_test": 4
  }
}
```

The parameters should be configured as follows:
- `system_name`: the name of the HPC cluster as defined in the LEXIS project resources (used for HPC-iRODS data transfer)
- `heappe_uri`: base endpoint of the HEAppE API for this data center
- `cluster_id`: the ID of the specific cluster of the data center on which to schedule jobs
- `workflow_dir`: the directory in which project files and software are stored (in particular: the `workflows` and `software` directories described earlier)
- `scratch_dir`: the directory in which scratch files can be created for intermediate data store
- `node_type_ids`: for each node type (or Slurm queue) necessary for running the workflows, the ID with which they are registered in HEAppE

To determine the right values for `cluster_id` and `node_type_ids`, use [the `/heappe/ClusterInformation/ListAvailableClusters` API endpoint](https://heappe.it4i.cz/docs/pages/restApi/restApi.html#get--heappe-ClusterInformation-ListAvailableClusters).

#### Dataset storage (iRODS)

Finally, the output datasets are copied to iRODS through the LEXIS DDI layer. This is also registered with the "HTTP" connection type, and the following required fields:
- Connection ID: a unique identifier for this iRODS storage zone. Should be prefixed with `irods_`, e.g.: `irods_data_center_z`
- Login: the project/resource name associated with your iRODS storage (set up in LEXIS)

The "Extra" field contains a single value:
```json
{
  "system_name": "iRODS Storage Name"
}
```
This `system_name` is configured in LEXIS as well (under project resources).
