{ "cells": [ { "cell_type": "markdown", "id": "0", "metadata": {}, "source": [ "# Strategies for climatology calculations\n", "\n", "This notebook is motivated by\n", "[this post](https://discourse.pangeo.io/t/understanding-optimal-zarr-chunking-scheme-for-a-climatology/2335)\n", "on the Pangeo discourse forum.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "1", "metadata": { "tags": [] }, "outputs": [], "source": [ "import dask.array\n", "import pandas as pd\n", "import xarray as xr\n", "\n", "import flox\n", "import flox.xarray" ] }, { "cell_type": "markdown", "id": "2", "metadata": {}, "source": [ "Let's first create an example Xarray Dataset representing the OISST dataset,\n", "with chunk sizes matching that in the post.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "3", "metadata": {}, "outputs": [], "source": [ "oisst = xr.DataArray(\n", " dask.array.ones((14532, 720, 1440), chunks=(20, -1, -1)),\n", " dims=(\"time\", \"lat\", \"lon\"),\n", " coords={\"time\": pd.date_range(\"1981-09-01 12:00\", \"2021-06-14 12:00\", freq=\"D\")},\n", " name=\"sst\",\n", ")\n", "oisst" ] }, { "cell_type": "markdown", "id": "4", "metadata": {}, "source": [ "To account for Feb-29 being present in some years, we'll construct a time vector to group by as \"mmm-dd\" string.\n", "\n", "```{seealso}\n", "For more options, see [this great website](https://strftime.org/).\n", "```" ] }, { "cell_type": "code", "execution_count": null, "id": "5", "metadata": {}, "outputs": [], "source": [ "day = oisst.time.dt.strftime(\"%h-%d\").rename(\"day\")\n", "day" ] }, { "cell_type": "markdown", "id": "6", "metadata": {}, "source": [ "## First, `method=\"map-reduce\"`\n", "\n", "The default\n", "[method=\"map-reduce\"](https://flox.readthedocs.io/en/latest/implementation.html#method-map-reduce)\n", "doesn't work so well. We aggregate all days in a single ~3GB chunk.\n", "\n", "For this to work well, we'd want smaller chunks in space and bigger chunks in\n", "time.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "7", "metadata": {}, "outputs": [], "source": [ "flox.xarray.xarray_reduce(\n", " oisst,\n", " day,\n", " func=\"mean\",\n", " method=\"map-reduce\",\n", ")" ] }, { "cell_type": "markdown", "id": "8", "metadata": {}, "source": [ "### Rechunking for map-reduce\n", "\n", "We can split each chunk along the `lat`, `lon` dimensions to make sure the\n", "output chunk sizes are more reasonable\n" ] }, { "cell_type": "code", "execution_count": null, "id": "9", "metadata": {}, "outputs": [], "source": [ "flox.xarray.xarray_reduce(\n", " oisst.chunk({\"lat\": -1, \"lon\": 120}),\n", " day,\n", " func=\"mean\",\n", " method=\"map-reduce\",\n", ")" ] }, { "cell_type": "markdown", "id": "10", "metadata": {}, "source": [ "But what if we didn't want to rechunk the dataset so drastically (note the 10x\n", "increase in tasks). For that let's try `method=\"cohorts\"`\n", "\n", "## `method=\"cohorts\"`\n", "\n", "We can take advantage of patterns in the groups here \"day of year\".\n", "Specifically:\n", "\n", "1. The groups at an approximately periodic interval, 365 or 366 days\n", "2. The chunk size 20 is smaller than the period of 365 or 366. This means, that\n", " to construct the mean for days 1-20, we just need to use the chunks that\n", " contain days 1-20.\n", "\n", "This strategy is implemented as\n", "[method=\"cohorts\"](https://flox.readthedocs.io/en/latest/implementation.html#method-cohorts)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "11", "metadata": {}, "outputs": [], "source": [ "flox.xarray.xarray_reduce(\n", " oisst,\n", " day,\n", " func=\"mean\",\n", " method=\"cohorts\",\n", ")" ] }, { "cell_type": "markdown", "id": "12", "metadata": {}, "source": [ "By default cohorts doesn't work so well for this problem because the period\n", "isn't regular (365 vs 366) and the period isn't divisible by the chunk size. So\n", "the groups end up being \"out of phase\" (for a visual illustration\n", "[click here](https://flox.readthedocs.io/en/latest/implementation.html#method-cohorts)).\n", "Now we have the opposite problem: the chunk sizes on the output are too small.\n", "\n", "Let us inspect the cohorts" ] }, { "cell_type": "code", "execution_count": null, "id": "13", "metadata": {}, "outputs": [], "source": [ "# integer codes for each \"day\"\n", "codes, _ = pd.factorize(day.data)\n", "preferred_method, cohorts = flox.core.find_group_cohorts(\n", " labels=codes,\n", " chunks=(oisst.chunksizes[\"time\"],),\n", ")\n", "print(len(cohorts))" ] }, { "cell_type": "markdown", "id": "14", "metadata": {}, "source": [ "Looking more closely, we can see many cohorts with a single entry. " ] }, { "cell_type": "code", "execution_count": null, "id": "15", "metadata": {}, "outputs": [], "source": [ "cohorts.values()" ] }, { "cell_type": "markdown", "id": "16", "metadata": {}, "source": [ "## Rechunking data for cohorts\n", "\n", "Can we fix the \"out of phase\" problem by rechunking along time?\n", "\n", "First lets see where the current chunk boundaries are" ] }, { "cell_type": "code", "execution_count": null, "id": "17", "metadata": {}, "outputs": [], "source": [ "oisst.chunksizes[\"time\"][:10]" ] }, { "cell_type": "markdown", "id": "18", "metadata": {}, "source": [ "We'll choose to rechunk such that a single month in is a chunk. This is not too different from the current chunking but will help your periodicity problem" ] }, { "cell_type": "code", "execution_count": null, "id": "19", "metadata": {}, "outputs": [], "source": [ "newchunks = xr.ones_like(day).astype(int).resample(time=\"ME\").count()" ] }, { "cell_type": "code", "execution_count": null, "id": "20", "metadata": {}, "outputs": [], "source": [ "rechunked = oisst.chunk(time=tuple(newchunks.data))" ] }, { "cell_type": "markdown", "id": "21", "metadata": {}, "source": [ "And now our cohorts contain more than one group, *and* there is a substantial reduction in number of cohorts **162 -> 12**\n" ] }, { "cell_type": "code", "execution_count": null, "id": "22", "metadata": {}, "outputs": [], "source": [ "preferred_method, new_cohorts = flox.core.find_group_cohorts(\n", " labels=codes,\n", " chunks=(rechunked.chunksizes[\"time\"],),\n", ")\n", "# one cohort per month!\n", "len(new_cohorts)" ] }, { "cell_type": "code", "execution_count": null, "id": "23", "metadata": {}, "outputs": [], "source": [ "preferred_method" ] }, { "cell_type": "code", "execution_count": null, "id": "24", "metadata": {}, "outputs": [], "source": [ "new_cohorts.values()" ] }, { "cell_type": "markdown", "id": "25", "metadata": {}, "source": [ "Now the groupby reduction **looks OK** in terms of number of tasks but remember\n", "that rechunking to get to this point involves some communication overhead.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "26", "metadata": {}, "outputs": [], "source": [ "flox.xarray.xarray_reduce(rechunked, day, func=\"mean\", method=\"cohorts\")" ] }, { "cell_type": "markdown", "id": "27", "metadata": {}, "source": [ "flox's heuristics will choose `\"cohorts\"` automatically!" ] }, { "cell_type": "code", "execution_count": null, "id": "28", "metadata": {}, "outputs": [], "source": [ "flox.xarray.xarray_reduce(rechunked, day, func=\"mean\")" ] }, { "cell_type": "markdown", "id": "29", "metadata": {}, "source": [ "## How about other climatologies?\n", "\n", "Let's try monthly\n" ] }, { "cell_type": "code", "execution_count": null, "id": "30", "metadata": {}, "outputs": [], "source": [ "flox.xarray.xarray_reduce(oisst, oisst.time.dt.month, func=\"mean\")" ] }, { "cell_type": "markdown", "id": "31", "metadata": {}, "source": [ "This looks great. Why?\n", "\n", "It's because each chunk (size 20) is smaller than number of days in a typical\n", "month. `flox` initially applies the groupby-reduction blockwise. For the chunk\n", "size of 20, we will have at most 2 groups in each chunk, so the initial\n", "blockwise reduction is quite effective - at least a 10x reduction in size from\n", "20 elements in time to at most 2 elements in time.\n", "\n", "For this kind of problem, `\"map-reduce\"` works quite well.\n" ] } ], "metadata": { "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3" } }, "nbformat": 4, "nbformat_minor": 5 }