Dldict by rohe · Pull Request #30 · IdentityPython/JWTConnect-Python-OidcMsg · GitHub
Skip to content
This repository was archived by the owner on Jun 1, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions .travis.yml
8 changes: 4 additions & 4 deletions src/oidcmsg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
__author__ = 'Roland Hedberg'
__version__ = '1.2.0'
__author__ = "Roland Hedberg"
__version__ = "1.3.0"

import os

VERIFIED_CLAIM_PREFIX = '__verified'
VERIFIED_CLAIM_PREFIX = "__verified"


def verified_claim_name(claim):
return '{}_{}'.format(VERIFIED_CLAIM_PREFIX, claim)
return "{}_{}".format(VERIFIED_CLAIM_PREFIX, claim)


def proper_path(path):
Expand Down
20 changes: 10 additions & 10 deletions src/oidcmsg/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,41 @@
def add_issuer(conf, issuer):
res = {}
for key, val in conf.items():
if key == 'abstract_storage_cls':
if key == "abstract_storage_cls":
res[key] = val
else:
_val = copy.copy(val)
_val['issuer'] = quote_plus(issuer)
_val["issuer"] = quote_plus(issuer)
res[key] = _val
return res


class OidcContext(ImpExp):
parameter = {"keyjar": KeyJar, "issuer": None}

def __init__(self, config=None, keyjar=None, entity_id=''):
def __init__(self, config=None, keyjar=None, entity_id=""):
ImpExp.__init__(self)
if config is None:
config = {}

self.issuer = entity_id
self.keyjar = self._keyjar(keyjar, conf=config, entity_id=entity_id)

def _keyjar(self, keyjar=None, conf=None, entity_id=''):
def _keyjar(self, keyjar=None, conf=None, entity_id=""):
if keyjar is None:
if 'keys' in conf:
if "keys" in conf:
args = {k: v for k, v in conf["keys"].items() if k != "uri_path"}
_keyjar = init_key_jar(**args)
else:
_keyjar = KeyJar()
if 'jwks' in conf:
_keyjar.import_jwks(conf['jwks'], '')
if "jwks" in conf:
_keyjar.import_jwks(conf["jwks"], "")

if '' in _keyjar and entity_id:
if "" in _keyjar and entity_id:
# make sure I have the keys under my own name too (if I know it)
_keyjar.import_jwks_as_json(_keyjar.export_jwks_as_json(True, ''), entity_id)
_keyjar.import_jwks_as_json(_keyjar.export_jwks_as_json(True, ""), entity_id)

_httpc_params = conf.get('httpc_params')
_httpc_params = conf.get("httpc_params")
if _httpc_params:
_keyjar.httpc_params = _httpc_params

Expand Down
2 changes: 1 addition & 1 deletion src/oidcmsg/exception.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__author__ = 'Roland Hedberg'
__author__ = "Roland Hedberg"


class OidcMsgError(Exception):
Expand Down
101 changes: 84 additions & 17 deletions src/oidcmsg/impexp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Any
from typing import List
from typing import Optional

from cryptojwt.utils import as_bytes
from cryptojwt.utils import importer
from cryptojwt.utils import qualified_name

Expand All @@ -9,19 +11,24 @@

class ImpExp:
parameter = {}
special_load_dump = {}
init_args = []

def __init__(self):
pass

def _dump(self, cls, item, exclude_attributes: Optional[List[str]] = None) -> dict:
if cls in [None, "", [], {}]:
val = item
def dump_attr(self, cls, item, exclude_attributes: Optional[List[str]] = None) -> dict:
if cls in [None, 0, "", [], {}, bool, b'']:
if cls == b'':
val = as_bytes(item)
else:
val = item
elif isinstance(item, Message):
val = {qualified_name(item.__class__): item.to_dict()}
elif cls == object:
val = qualified_name(item)
elif isinstance(cls, list):
val = [self._dump(cls[0], v, exclude_attributes) for v in item]
val = [self.dump_attr(cls[0], v, exclude_attributes) for v in item]
else:
val = item.dump(exclude_attributes=exclude_attributes)

Expand All @@ -31,42 +38,98 @@ def dump(self, exclude_attributes: Optional[List[str]] = None) -> dict:
_exclude_attributes = exclude_attributes or []
info = {}
for attr, cls in self.parameter.items():
if attr in _exclude_attributes:
if attr in _exclude_attributes or attr in self.special_load_dump:
continue

item = getattr(self, attr, None)
if item is None:
continue

info[attr] = self._dump(cls, item, exclude_attributes)
info[attr] = self.dump_attr(cls, item, exclude_attributes)

for attr, d in self.special_load_dump.items():
item = getattr(self, attr, None)
if item:
info[attr] = d["dump"](item, exclude_attributes=exclude_attributes)

return info

def _local_adjustments(self):
def local_load_adjustments(self, **kwargs):
pass

def _load(self, cls, item):
if cls in [None, "", [], {}]:
val = item
def load_attr(
self,
cls: Any,
item: dict,
init_args: Optional[dict] = None,
load_args: Optional[dict] = None,
) -> Any:
if load_args:
_kwargs = {"load_args": load_args}
_load_args = load_args
else:
_kwargs = {}
_load_args = {}

if init_args:
_kwargs["init_args"] = init_args

if cls in [None, 0, "", [], {}, bool, b'']:
if cls == b'':
val = as_bytes(item)
else:
val = item
elif cls == object:
val = importer(item)
elif isinstance(cls, list):
val = [cls[0]().load(v) for v in item]
if isinstance(cls[0], str):
_cls = importer(cls[0])
else:
_cls = cls[0]

if issubclass(_cls, ImpExp) and init_args:
_args = {k: v for k, v in init_args.items() if k in _cls.init_args}
else:
_args = {}

val = [_cls(**_args).load(v, **_kwargs) for v in item]
elif issubclass(cls, Message):
val = cls().from_dict(item)
_cls_name = list(item.keys())[0]
_cls = importer(_cls_name)
val = _cls().from_dict(item[_cls_name])
else:
val = cls().load(item)
if issubclass(cls, ImpExp) and init_args:
_args = {k: v for k, v in init_args.items() if k in cls.init_args}
else:
_args = {}

val = cls(**_args).load(item, **_kwargs)

return val

def load(self, item: dict):
def load(self, item: dict, init_args: Optional[dict] = None, load_args: Optional[dict] = None):

if load_args:
_kwargs = {"load_args": load_args}
_load_args = load_args
else:
_kwargs = {}
_load_args = {}

if init_args:
_kwargs["init_args"] = init_args

for attr, cls in self.parameter.items():
if attr not in item:
if attr not in item or attr in self.special_load_dump:
continue

setattr(self, attr, self._load(cls, item[attr]))
setattr(self, attr, self.load_attr(cls, item[attr], **_kwargs))

for attr, func in self.special_load_dump.items():
if attr in item:
setattr(self, attr, func["load"](item[attr], **_kwargs))

self._local_adjustments()
self.local_load_adjustments(**_load_args)
return self

def flush(self):
Expand All @@ -78,6 +141,10 @@ def flush(self):
for attr, cls in self.parameter.items():
if cls is None:
setattr(self, attr, None)
elif cls == 0:
setattr(self, attr, 0)
elif cls is bool:
setattr(self, attr, False)
elif cls == "":
setattr(self, attr, "")
elif cls == []:
Expand Down
132 changes: 132 additions & 0 deletions src/oidcmsg/item.py
Loading