Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for window_by TVF window semantics #1117

Open
zhenzhongxu opened this issue Aug 28, 2024 · 16 comments
Open

feat: Add support for window_by TVF window semantics #1117

zhenzhongxu opened this issue Aug 28, 2024 · 16 comments
Labels

Comments

@zhenzhongxu
Copy link

zhenzhongxu commented Aug 28, 2024

What happened?

I have a simple example in this notebook showing that the window semantics are not supported by the compiler yet. Is this something we can add relatively easily?

https://github.com/zhenzhongxu/composable-data-architecture/blob/3c5324466c4355cda98d2443fc1f01b011a5241b/ibis-substrait-example.ipynb

import ibis
person_schema = ibis.schema({
        "id": "int64",
        "name": "string",
        "emailaddress": "string",
        "creditcard": "string",
        "city": "string",
        "state": "string"
    }
)

auction_schema = ibis.schema({
        "id": "int64",
        "itemname": "string",
        "description": "string",
        "initialbid": "float64",
        "reserve": "float64",
        "expires": "timestamp(3)",
        "seller": "int64",
    }
)

bid_schema = ibis.schema({
        "auction": "int64",
        "bidder": "int64",
        "price": "float64",
        "datetime": "timestamp(3)"
    }
)
import ibis
from ibis import _

# Create source tables
person_table = ibis.table(name="Person", schema=person_schema)
auction_table = ibis.table(name="Auction", schema=auction_schema)
bid_table = ibis.table(name="Bid", schema=bid_schema)

# Create SubstraitCompiler object
from ibis_substrait.compiler.core import SubstraitCompiler
compiler = SubstraitCompiler()

# over window
over_window = bid_table.filter(_ is not None)[_.price.mean().over(range=(-ibis.interval(seconds=10), 0), order_by=_.datetime).name("avg_price")]
compiler.compile(over_window)

# hop window
hop_window = bid_table.filter(_ is not None).window_by(time_col=_.datetime).hop(size=ibis.interval(seconds=10), slide=ibis.interval(seconds=2)).agg(_.price.mean().name("avg_price"))

compiler.compile(hop_window)

What version of ibis-substrait are you using?

v.4.0.1

What substrait consumer(s) are you using, if any?

N/A

Relevant log output

---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
Cell In[47], line 14
     10 # hop window
     11 hop_window = bid_table.filter(_ is not None).window_by(time_col=_.datetime).hop(size=ibis.interval(seconds=10), slide=ibis.interval(seconds=2)).agg(_.price.mean().name("avg_price"))
---> 14 compiler.compile(hop_window)

File ~/miniconda3/envs/ibis-dev-arm64/lib/python3.12/site-packages/ibis_substrait/compiler/core.py:222, in SubstraitCompiler.compile(self, expr, **kwargs)
    217 from .translate import translate
    219 expr_schema = expr.schema()
    220 rel = stp.PlanRel(
    221     root=stalg.RelRoot(
--> 222         input=translate(expr.op(), compiler=self, **kwargs),
    223         names=translate(expr_schema).names,
    224     )
    225 )
    226 ver = vparse(__substrait_version__)
    227 return stp.Plan(
    228     version=stp.Version(
    229         major_number=ver.major,
   (...)
    256     relations=[rel],
    257 )

File ~/miniconda3/envs/ibis-dev-arm64/lib/python3.12/functools.py:907, in singledispatch.<locals>.wrapper(*args, **kw)
    903 if not args:
    904     raise TypeError(f'{funcname} requires at least '
    905                     '1 positional argument')
--> 907 return dispatch(args[0].__class__)(*args, **kw)

File ~/miniconda3/envs/ibis-dev-arm64/lib/python3.12/site-packages/ibis_substrait/compiler/translate.py:55, in translate(*args, **kwargs)
     53 @functools.singledispatch
     54 def translate(*args: Any, **kwargs: Any) -> Any:
---> 55     raise NotImplementedError(*args)

NotImplementedError: <ibis.expr.operations.temporal_windows.WindowAggregate object at 0x16d904dd0>
@zhenzhongxu zhenzhongxu added the bug Something isn't working label Aug 28, 2024
@gforsyth gforsyth changed the title feat: Add support for window semantics feat: Add support for hop window semantics Aug 28, 2024
@gforsyth gforsyth changed the title feat: Add support for hop window semantics feat: Add support for window TVF semantics Aug 28, 2024
@gforsyth gforsyth changed the title feat: Add support for window TVF semantics feat: Add support for window_by TVF semantics Aug 28, 2024
@gforsyth
Copy link
Member

Hey @zhenzhongxu -- we can add a target for the window_by TVF stuff, but I don't know how to spell those style functions in Substrait -- do you have a reference for what the target should look like?

We already support batch window functions.

@zhenzhongxu
Copy link
Author

@gforsyth does this help, or we can also try to look at the Calcite plan on Flink.

@zhenzhongxu
Copy link
Author

Also add @richtia @EpsilonPrime for guidance on the substrait side.

@gforsyth
Copy link
Member

I'm reasonably familiar with the flink-style TVF windowing, but I'm not clear on whether there's a way to express that in Substrait -- so yeah, if @EpsilonPrime or @richtia have any pointers.

If there isn't currently a way to express that in Substrait, then the first step will be adding that to the upstream. Then it should be straightforward to target from here.

@EpsilonPrime
Copy link

EpsilonPrime commented Aug 28, 2024

{
  "extensionUris": [
    {
      "extensionUriAnchor": 1,
      "uri": "/functions_arithmetic.yaml"
    }
  ],
  "extensions": [
    {
      "extensionFunction": {
        "extensionUriReference": 1,
        "functionAnchor": 1,
        "name": "row_number"
      }
    }
  ],
  "relations": [
    {
      "root": {
        "input": {
          "project": {
            "common": {
              "emit": {
                "outputMapping": [
                  3,
                  4,
                  5,
                  6
                ]
              }
            },
            "input": {
              "read": {
                "common": {
                  "direct": {}
                },
                "baseSchema": {
                  "names": [
                    "user_id",
                    "name",
                    "paid_for_service"
                  ],
                  "struct": {
                    "types": [
                      {
                        "string": {
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      },
                      {
                        "string": {
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      },
                      {
                        "bool": {
                          "nullability": "NULLABILITY_REQUIRED"
                        }
                      }
                    ],
                    "nullability": "NULLABILITY_REQUIRED"
                  }
                },
                "namedTable": {
                  "names": [
                    "users"
                  ]
                }
              }
            },
            "expressions": [
              {
                "selection": {
                  "directReference": {
                    "structField": {}
                  },
                  "rootReference": {}
                }
              },
              {
                "selection": {
                  "directReference": {
                    "structField": {
                      "field": 1
                    }
                  },
                  "rootReference": {}
                }
              },
              {
                "selection": {
                  "directReference": {
                    "structField": {
                      "field": 2
                    }
                  },
                  "rootReference": {}
                }
              },
              {
                "windowFunction": {
                  "functionReference": 1,
                  "sorts": [
                    {
                      "expr": {
                        "selection": {
                          "directReference": {
                            "structField": {
                              "field": 1
                            }
                          },
                          "rootReference": {}
                        }
                      },
                      "direction": "SORT_DIRECTION_ASC_NULLS_FIRST"
                    }
                  ],
                  "upperBound": {
                    "unbounded": {}
                  },
                  "lowerBound": {
                    "unbounded": {}
                  },
                  "outputType": {
                    "i64": {
                      "nullability": "NULLABILITY_REQUIRED"
                    }
                  },
                  "invocation": 3
                }
              }
            ]
          }
        },
        "names": [
          "user_id",
          "name",
          "paid_for_service",
          "row_number"
        ]
      }
    }
  ],
  "version": {
    "minorNumber": 52,
    "producer": "spark-substrait-gateway"
  }
}

@gforsyth
Copy link
Member

Thanks, @EpsilonPrime -- I think that's a standard window function, which is a different beast than the table-valued functions used by some of the streaming engines.

I don't know that I saw any chatter on the substrait issue tracker about anyone implementing this.

We can probably desugar the hop and tumble operations on the Ibis side, but some of the other TVF windows are harder to write in batch form

@EpsilonPrime
Copy link

The only other way I know of to do window related behavior is with the ConsistentPartitionWindowRel -- https://github.com/substrait-io/substrait/blob/main/proto/substrait/algebra.proto#L272.

@zhenzhongxu
Copy link
Author

Let me grab the Calcite plan for the window TVF so we can compare the standard window vs streaming window. Will circle back shortly.

@gforsyth gforsyth added feature and removed bug Something isn't working labels Aug 28, 2024
@zhenzhongxu
Copy link
Author

zhenzhongxu commented Aug 28, 2024

For a streaming Ibis declaration

hop_window = bid_table.filter(_ is not None).window_by(time_col=_.datetime).hop(size=ibis.interval(seconds=10), slide=ibis.interval(seconds=2)).agg(_.price.mean().name("avg_price"))

will translate to Streaming Flink SQL backend as

CREATE TABLE Bid (
    `auction` BIGINT, 
    `bidder` BIGINT, 
    `price` DOUBLE, 
    `datetime` TIMESTAMP(3),
    WATERMARK FOR datetime AS `datetime` - INTERVAL '1' SECOND
) WITH ('connector' = 'datagen');

SELECT window_start, window_end, AVG(price) AS avg_price
FROM TABLE(HOP(TABLE Bid, DESCRIPTOR(datetime), INTERVAL '2' SECONDS, INTERVAL '10' SECONDS))
GROUP BY window_start, window_end;

Running through the Flink Planner (Calcite), shows below:

| == Abstract Syntax Tree ==
LogicalAggregate(group=[{0, 1}], avg_price=[AVG($2)])
+- LogicalProject(window_start=[$4], window_end=[$5], price=[$2])
   +- LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($3), 2000:INTERVAL SECOND, 10000:INTERVAL SECOND)], rowType=[RecordType(BIGINT auction, BIGINT bidder, DOUBLE price, TIMESTAMP(3) *ROWTIME* datetime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
      +- LogicalProject(auction=[$0], bidder=[$1], price=[$2], datetime=[$3])
         +- LogicalWatermarkAssigner(rowtime=[datetime], watermark=[-($3, 1000:INTERVAL SECOND)])
            +- LogicalTableScan(table=[[default_catalog, default_database, Bid]])

== Optimized Physical Plan ==
Calc(select=[window_start, window_end, avg_price])
+- GlobalWindowAggregate(window=[HOP(slice_end=[$slice_end], size=[10 s], slide=[2 s])], select=[AVG((sum$0, count$1)) AS avg_price, COUNT(count1$2) AS window_start, start('w$) AS window_end])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[HOP(time_col=[datetime], size=[10 s], slide=[2 s])], select=[AVG(price) AS (sum$0, count$1), COUNT(*) AS count1$2, slice_end('w$) AS $slice_end])
         +- Calc(select=[price, datetime])
            +- WatermarkAssigner(rowtime=[datetime], watermark=[-(datetime, 1000:INTERVAL SECOND)])
               +- TableSourceScan(table=[[default_catalog, default_database, Bid]], fields=[auction, bidder, price, datetime])

== Optimized Execution Plan ==
Calc(select=[window_start, window_end, avg_price])
+- GlobalWindowAggregate(window=[HOP(slice_end=[$slice_end], size=[10 s], slide=[2 s])], select=[AVG((sum$0, count$1)) AS avg_price, COUNT(count1$2) AS window_start, start('w$) AS window_end])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[HOP(time_col=[datetime], size=[10 s], slide=[2 s])], select=[AVG(price) AS (sum$0, count$1), COUNT(*) AS count1$2, slice_end('w$) AS $slice_end])
         +- Calc(select=[price, datetime])
            +- WatermarkAssigner(rowtime=[datetime], watermark=[(datetime - 1000:INTERVAL SECOND)])
               +- TableSourceScan(table=[[default_catalog, default_database, Bid]], fields=[auction, bidder, price, datetime])
 |

Please notice that watermark assigner and table-valued functions are specific to the streaming operations. So they are unlikely available on Substrait now. We have two options to unblock:

  1. Before more than one streaming engine supports Substrait as the input, we need to rely on a middle layer to implement a standard batch plan to the streaming logical plan conversion. For example, Arroyo on DataFusion does exactly this (tagging @mwylde). This is a safer bet until we get more momentum on the Substrait side for streaming.
  2. When the first engine is willing to commit to taking the streaming-specific Substrait plan as input, we need to implement these streaming-specific operators and relationships on Substrait and then roll into the Ibis compiler. @mwylde, we should talk about this more since it can simplify the Arroyo architecture long term, but I'd suggest going with option 1 to keep backward compatibility for now.

So if we go with option 1, can we provide a compilation switch (e.g., batch/streaming modes). Batch mode will generate standard window without watermark and TVF, and streaming mode will throw unimplemented exception for now (until we go for option 2)?

Thoughts?

@mwylde
Copy link

mwylde commented Aug 29, 2024

I think option 1 would be pretty straightforward for Arroyo. Our streaming window functions (hop, tumble, etc.) are expressed in SQL as normal scalar functions:

select count(*), tumble(interval '10 seconds') as window
from events
group by window;

This gets rewritten into a logical plan with a separate window operator:

image

If the window functions are preserved in the substrait plan as normal scalar functions we could go do that rewrite on the substrait plan itself.

Longer term it would be better to properly integrate windows into the plan. I'm not sure how tied that is to the particular syntax (Flink-style TVFs or our special scalar functions).

@zhenzhongxu
Copy link
Author

zhenzhongxu commented Aug 29, 2024

@EpsilonPrime can you help confirm what @mwylde mentioned about the window scalar function is similar to the standard window function already supported in Substrait. Maybe do we need an additional type field to indicate window type (hop/sliding, tumble, session)?

Additionally, taking an OVER window example below (which is more popular in the batch world but also supported on streaming), @EpsilonPrime can you confirm this is relevant to the partitioned window you mentioned earlier?

SELECT
    date,
    product_id,
    sales_amount,
    AVG(sales_amount) OVER (
        PARTITION BY product_id 
        ORDER BY date 
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) AS moving_avg_sales
FROM
    sales
ORDER BY
    product_id, date;

My thinking if both (scalar function & OVER window) are already supported on Substrait then we can proceed to support both on the compiler?

@zhenzhongxu zhenzhongxu changed the title feat: Add support for window_by TVF semantics feat: Add support for window_by TVF & over window semantics Aug 29, 2024
@EpsilonPrime
Copy link

It seems like you'd be able to use the window function expression syntax I first linked to handle AVG OVER as that syntax just needs an operation with partition and bounds.

As for hop, tumble, and session I suspect they might require alternative bound restrictions/behaviors. I'm still looking at how those work.

@gforsyth
Copy link
Member

My thinking if both (scalar function & OVER window) are already supported on Substrait then we can proceed to support both on the compiler?

This should already work in ibis-substrait.

>>> import ibis

>>> from ibis import _

>>> from ibis_substrait.compiler.core import SubstraitCompiler

>>> sales = ibis.table({"date": "date", "product_id": "str", "sales_amount": "float"}
...: , 'sales')

>>> expr = sales.mutate(moving_avg_sales=_.sales_amount.mean().over(rows=(-2,0), grou
...: p_by=_.product_id, order_by=_.date))

>>> ibis.to_sql(expr)
SELECT
  "t0"."date",
  "t0"."product_id",
  "t0"."sales_amount",
  AVG("t0"."sales_amount") OVER (PARTITION BY "t0"."product_id" ORDER BY "t0"."date" ASC ROWS BETWEEN 2 preceding AND CURRENT ROW) AS "moving_avg_sales"
FROM "sales" AS "t0"

>>> compiler = SubstraitCompiler()

>>> substrait_plan = compiler.compile(expr)

>>> substrait_plan
Substrait Plan Output
extension_uris {
  extension_uri_anchor: 1
  uri: "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml"
}
extensions {
  extension_function {
    extension_uri_reference: 1
    function_anchor: 1
    name: "avg:fp64"
  }
}
relations {
  root {
    input {
      project {
        common {
          emit {
            output_mapping: 3
            output_mapping: 4
            output_mapping: 5
            output_mapping: 6
          }
        }
        input {
          read {
            common {
              direct {
              }
            }
            base_schema {
              names: "date"
              names: "product_id"
              names: "sales_amount"
              struct {
                types {
                  date {
                    nullability: NULLABILITY_NULLABLE
                  }
                }
                types {
                  string {
                    nullability: NULLABILITY_NULLABLE
                  }
                }
                types {
                  fp64 {
                    nullability: NULLABILITY_NULLABLE
                  }
                }
                nullability: NULLABILITY_REQUIRED
              }
            }
            named_table {
              names: "sales"
            }
          }
        }
        expressions {
          selection {
            direct_reference {
              struct_field {
              }
            }
            root_reference {
            }
          }
        }
        expressions {
          selection {
            direct_reference {
              struct_field {
                field: 1
              }
            }
            root_reference {
            }
          }
        }
        expressions {
          selection {
            direct_reference {
              struct_field {
                field: 2
              }
            }
            root_reference {
            }
          }
        }
        expressions {
          window_function {
            function_reference: 1
            partitions {
              selection {
                direct_reference {
                  struct_field {
                    field: 1
                  }
                }
                root_reference {
                }
              }
            }
            sorts {
              expr {
                selection {
                  direct_reference {
                    struct_field {
                    }
                  }
                  root_reference {
                  }
                }
              }
              direction: SORT_DIRECTION_ASC_NULLS_FIRST
            }
            upper_bound {
              following {
              }
            }
            lower_bound {
              preceding {
                offset: 2
              }
            }
            phase: AGGREGATION_PHASE_INITIAL_TO_RESULT
            output_type {
              fp64 {
                nullability: NULLABILITY_NULLABLE
              }
            }
            arguments {
              value {
                selection {
                  direct_reference {
                    struct_field {
                      field: 2
                    }
                  }
                  root_reference {
                  }
                }
              }
            }
          }
        }
      }
    }
    names: "date"
    names: "product_id"
    names: "sales_amount"
    names: "moving_avg_sales"
  }
}
version {
  minor_number: 54
  producer: "ibis-substrait"
}

@zhenzhongxu
Copy link
Author

zhenzhongxu commented Aug 29, 2024

Thanks @gforsyth ! I will give this a try first, and we can come back to revisit option 2 when the timing is right.

One more feedback:
We have a streaming specific over window:

over_window_streaming = bid_table.filter(_ is not None)[_.price.mean().over(range=(-ibis.interval(seconds=10), 0), order_by=_.datetime).name("avg_price")]

and a batch OVER window:

over_window_batch = bid_table.filter(_ is not None).mutate(avg_price=_.price.mean().over(rows=(-2,0), order_by=_.datetime))

which generates different compiled plans (actually the streaming one doesn't compile yet). The syntax is only different on minor details (mutate + time range vs row range). Are there opportunities for converging the two APIs to avoid user confusions. (I am putting myself in the users shoes), let me know if you want me to file a different issue on Ibis repo.

@EpsilonPrime
Copy link

The Substrait appears to be written with Datafusion's project bug/behavior in mind so it should work for us (and even better it should still work after the bug is fixed). The window function syntax matches what I tested against Datafusion too.

@gforsyth
Copy link
Member

gforsyth commented Sep 3, 2024

let me know if you want me to file a different issue on Ibis repo.

this is better discussed upstream

@zhenzhongxu zhenzhongxu changed the title feat: Add support for window_by TVF & over window semantics feat: Add support for window_by TVF window semantics Sep 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: backlog
Development

No branches or pull requests

5 participants
@mwylde @EpsilonPrime @zhenzhongxu @gforsyth and others