Skip to content

Commit

Permalink
support remote SpiceDB
Browse files Browse the repository at this point in the history
  • Loading branch information
vroldanbet committed Sep 4, 2023
1 parent db94273 commit e82f1ca
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 37 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ proxy.key
*__failpoint_*.go
*.go__failpoint*
*.sqlite
magefiles/mage_output_file.go
2 changes: 2 additions & 0 deletions deploy/proxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ spec:
containers:
- args:
- --secure-port=8443
- --spicedb-endpoint
- embedded://
- --backend-kubeconfig
- /opt/proxy/kubeconfig
- --cert-dir
Expand Down
23 changes: 17 additions & 6 deletions pkg/proxy/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
)

func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.WatchServiceClient, taskHubClient backend.TaskHubClient) http.Handler {
func WithAuthorization(handler, failed http.Handler, permissionsClient v1.PermissionsServiceClient, watchClient v1.WatchServiceClient, taskHubClient backend.TaskHubClient, lockMode string) (http.Handler, error) {
if lockMode == "" {
return nil, fmt.Errorf("lock mode is undefined")
}

if !(lockMode == LockingWriteToSpiceDBAndKube || lockMode == OptimisticWriteToSpiceDBAndKube) {
return nil, fmt.Errorf("unexpected lock mode: %s", lockMode)
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -67,7 +74,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
return
}

id, err := taskHubClient.ScheduleNewOrchestration(ctx, s.LockMode, api.WithInput(CreateObjInput{
id, err := taskHubClient.ScheduleNewOrchestration(ctx, lockMode, api.WithInput(CreateObjInput{
RequestInfo: requestInfo,
UserInfo: userInfo.(*user.DefaultInfo),
ObjectMeta: &pom.ObjectMeta,
Expand Down Expand Up @@ -130,7 +137,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
requestInfo.APIGroup == "" &&
requestInfo.Name != "" {
go func() {
cr, err := s.SpiceDBClient.CheckPermission(ctx, &v1.CheckPermissionRequest{
cr, err := permissionsClient.CheckPermission(ctx, &v1.CheckPermissionRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true},
},
Expand Down Expand Up @@ -173,7 +180,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
requestInfo.APIGroup == "" {

go func() {
lr, err := s.SpiceDBClient.LookupResources(ctx, &v1.LookupResourcesRequest{
lr, err := permissionsClient.LookupResources(ctx, &v1.LookupResourcesRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true},
},
Expand Down Expand Up @@ -246,7 +253,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
for _, u := range resp.Updates {
if u.Operation == v1.RelationshipUpdate_OPERATION_TOUCH || u.Operation == v1.RelationshipUpdate_OPERATION_CREATE {
// do a check
cr, err := s.SpiceDBClient.CheckPermission(ctx, &v1.CheckPermissionRequest{
cr, err := permissionsClient.CheckPermission(ctx, &v1.CheckPermissionRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true},
// TODO
Expand Down Expand Up @@ -284,7 +291,7 @@ func (s *Server) WithAuthorization(handler, failed http.Handler, watchClient v1.
req = req.WithContext(WithAuthzData(req.Context(), &authzData))

handler.ServeHTTP(w, req)
})
}), nil
}

type requestAuthzData int
Expand Down Expand Up @@ -422,6 +429,10 @@ func (d *AuthzData) FilterResp(resp *http.Response) error {
// filter single object
filtered, err = d.FilterObject(pom.ToPartialObjectMetadata(), body)
}
if err != nil {
fmt.Println(err)
return err
}

resp.Body = io.NopCloser(bytes.NewBuffer(filtered))
resp.Header["Content-Length"] = []string{fmt.Sprint(len(filtered))}
Expand Down
6 changes: 3 additions & 3 deletions pkg/proxy/durable_activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import (
"github.com/authzed/spicedb-kubeapi-proxy/pkg/failpoints"
)

// WriteToSpiceDBActivity writes relationships to spicedb and returns any errors.
// WriteToSpiceDB writes relationships to spicedb and returns any errors.
func (s *Server) WriteToSpiceDB(ctx task.ActivityContext) (any, error) {
var req v1.WriteRelationshipsRequest
if err := ctx.GetInput(&req); err != nil {
return nil, err
}
failpoints.FailPoint("panicWriteSpiceDB")
out, err := s.SpiceDBClient.WriteRelationships(ctx.Context(), &req)
out, err := s.opts.PermissionsClient.WriteRelationships(ctx.Context(), &req)
failpoints.FailPoint("panicSpiceDBReadResp")
return out, err
}

// WriteToKubeActivity
// WriteToKube peforms a Kube API Server POST, specified in a KubeReqInput propagated via the task.ActivityContext arg
func (s *Server) WriteToKube(ctx task.ActivityContext) (any, error) {
var req KubeReqInput
if err := ctx.GetInput(&req); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/proxy/durable_workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
lockRelationName = "workflow"
workflowResourceType = "workflow"
MaxKubeAttempts = 5
DefaultLockMode = LockingWriteToSpiceDBAndKube
OptimisticWriteToSpiceDBAndKube = "OptimisticWriteToSpiceDBAndKube"
LockingWriteToSpiceDBAndKube = "LockingWriteToSpiceDBAndKube"
WriteToSpiceDBActivity = "WriteToSpiceDBActivity"
Expand Down
90 changes: 82 additions & 8 deletions pkg/proxy/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,36 @@ package proxy
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"time"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/authzed/grpcutil"
"github.com/authzed/spicedb/pkg/cmd/server"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
genericapiserver "k8s.io/apiserver/pkg/server"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/logs"
logsv1 "k8s.io/component-base/logs/api/v1"

apiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/klog/v2"

"github.com/authzed/spicedb-kubeapi-proxy/pkg/spicedb"
)

const (
defaultDurableTaskDatabasePath = "/tmp/dtx.sqlite"
embeddedSpiceDBEndpoint = "embedded://"
)

type Options struct {
SecureServing apiserveroptions.SecureServingOptionsWithLoopback
Authentication Authentication
Expand All @@ -35,8 +47,16 @@ type Options struct {
ServingInfo *genericapiserver.SecureServingInfo
AdditionalAuthEnabled bool

SpicedbServer server.RunnableServer
SpiceDBClient any
WatchClient v1.WatchServiceClient
PermissionsClient v1.PermissionsServiceClient
SpiceDBEndpoint string
EmbeddedSpiceDB server.RunnableServer
insecure bool
skipVerifyCA bool
token string

DurableTaskDatabasePath string
LockMode string
}

func NewOptions() *Options {
Expand All @@ -55,7 +75,12 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
o.SecureServing.AddFlags(fs)
o.Authentication.AddFlags(fs)
logsv1.AddFlags(o.Logs, fs)
fs.StringVar(&o.DurableTaskDatabasePath, "durabletask-database-path", defaultDurableTaskDatabasePath, "Path for the file representing the SQLite database used for the durable task engine.")
fs.StringVar(&o.BackendKubeconfigPath, "backend-kubeconfig", o.BackendKubeconfigPath, "The path to the kubeconfig to proxy connections to. It should authenticate the user with cluster-admin permission.")
fs.StringVar(&o.SpiceDBEndpoint, "spicedb-endpoint", "localhost:50051", "Defines the endpoint endpoint to the SpiceDB authorizing proxy operations. if embedded:// is specified, an in memory ephemeral instance created.")
fs.BoolVar(&o.insecure, "spicedb-insecure", false, "If set to true uses the insecure transport configuration for gRPC. Set to false by default.")
fs.BoolVar(&o.skipVerifyCA, "spicedb-skip-verify-ca", false, "If set to true backend certificate trust chain is not verified. Set to false by default.")
fs.StringVar(&o.token, "spicedb-token", "", "specifies the preshared key to use with the remote SpiceDB")
}

func (o *Options) Complete(ctx context.Context) error {
Expand All @@ -69,14 +94,15 @@ func (o *Options) Complete(ctx context.Context) error {
if !filepath.IsAbs(o.BackendKubeconfigPath) {
pwd, err := os.Getwd()
if err != nil {
return err
return fmt.Errorf("couldn't load kubeconfig: %w", err)
}
o.BackendKubeconfigPath = filepath.Join(pwd, o.BackendKubeconfigPath)
}
o.BackendConfig, err = clientcmd.LoadFromFile(o.BackendKubeconfigPath)
if err != nil {
return err
return fmt.Errorf("couldn't load kubeconfig: %w", err)
}
klog.FromContext(ctx).WithValues("kubeconfig", o.BackendKubeconfigPath).Error(err, "loaded backend kube config")
}

if !filepath.IsAbs(o.SecureServing.ServerCert.CertDirectory) {
Expand All @@ -97,11 +123,59 @@ func (o *Options) Complete(ctx context.Context) error {

o.AdditionalAuthEnabled = o.Authentication.AdditionalAuthEnabled()

o.SpicedbServer, err = spicedb.NewServer(ctx)
spicedbURl, err := url.Parse(o.SpiceDBEndpoint)
if err != nil {
return err
return fmt.Errorf("unable to parse SpiceDB endpoint URL: %w", err)
}

var conn *grpc.ClientConn
if spicedbURl.Scheme == "embedded" {
klog.FromContext(ctx).WithValues("spicedb-endpoint", o.SpiceDBEndpoint).Info("using embedded SpiceDB")
o.EmbeddedSpiceDB, err = spicedb.NewServer(ctx)
if err != nil {
return fmt.Errorf("unable to stand up embedded SpiceDB: %w", err)
}

conn, err = o.EmbeddedSpiceDB.GRPCDialContext(ctx, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("unable to open gRPC connection with embedded SpiceDB: %w", err)
}
} else {
klog.FromContext(ctx).WithValues("spicedb-endpoint", o.SpiceDBEndpoint).
WithValues("spicedb-insecure", o.insecure).
WithValues("spicedb-skip-verify-ca", o.skipVerifyCA).
Info("using remote SpiceDB")
var opts []grpc.DialOption
if o.insecure {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
opts = append(opts, grpcutil.WithInsecureBearerToken(o.token))
} else {
opts = append(opts, grpcutil.WithBearerToken(o.token))
verification := grpcutil.VerifyCA
if o.skipVerifyCA {
verification = grpcutil.SkipVerifyCA
}

certs, err := grpcutil.WithSystemCerts(verification)
if err != nil {
return fmt.Errorf("unable to load system certificates: %w", err)
}

opts = append(opts, certs)
}
opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig}))

timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
conn, err = grpc.DialContext(timeoutCtx, o.SpiceDBEndpoint, opts...)
if err != nil {
return fmt.Errorf("unable to open gRPC connection to remote SpiceDB at %s: %w", o.SpiceDBEndpoint, err)
}
}

o.PermissionsClient = v1.NewPermissionsServiceClient(conn)
o.WatchClient = v1.NewWatchServiceClient(conn)

return nil
}

Expand Down
Loading

0 comments on commit e82f1ca

Please sign in to comment.