Commit 11028140 authored by klafyvel's avatar klafyvel Committed by Gabriel Detraz
Browse files

feat: Move LDAP to an optional app.

The Entire LDAP infrastructures now relies on signals rather than direct function calls and is in its own app. This means it can be deactivated, but also that we can easily plug new services in addition to LDAP, such as OAuth.

Closes issue #270
parent 9b6ece28
from django.contrib import admin
from .models import (
LdapUser,
LdapServiceUser,
LdapServiceUserGroup,
LdapUserGroup,
)
class LdapUserAdmin(admin.ModelAdmin):
"""LdapUser Admin view. Can't change password, manage
by User General model.
Parameters:
Django ModelAdmin: Apply on django ModelAdmin
"""
list_display = ("name", "uidNumber", "login_shell")
exclude = ("user_password", "sambat_nt_password")
search_fields = ("name",)
class LdapServiceUserAdmin(admin.ModelAdmin):
"""LdapServiceUser Admin view. Can't change password, manage
by User General model.
Parameters:
Django ModelAdmin: Apply on django ModelAdmin
"""
list_display = ("name",)
exclude = ("user_password",)
search_fields = ("name",)
class LdapUserGroupAdmin(admin.ModelAdmin):
"""LdapUserGroup Admin view.
Parameters:
Django ModelAdmin: Apply on django ModelAdmin
"""
list_display = ("name", "members", "gid")
search_fields = ("name",)
class LdapServiceUserGroupAdmin(admin.ModelAdmin):
"""LdapServiceUserGroup Admin view.
Parameters:
Django ModelAdmin: Apply on django ModelAdmin
"""
list_display = ("name",)
search_fields = ("name",)
admin.site.register(LdapUser, LdapUserAdmin)
admin.site.register(LdapUserGroup, LdapUserGroupAdmin)
admin.site.register(LdapServiceUser, LdapServiceUserAdmin)
admin.site.register(LdapServiceUserGroup, LdapServiceUserGroupAdmin)
from django.apps import AppConfig
class LdapSyncConfig(AppConfig):
name = 'ldap_sync'
# Copyright © 2018 Maël Kervella
# Copyright © 2021 Hugo Levy-Falk
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -21,6 +22,7 @@ from django.core.management.base import BaseCommand, CommandError
from django.conf import settings
from users.models import User, ListRight
from ldap_sync.models import synchronise_user, synchronise_serviceuser, synchronise_usergroup
def split_lines(lines):
......@@ -89,9 +91,9 @@ def flush_ldap(binddn, bindpass, server, usersdn, groupsdn):
def sync_ldap():
"""Syncrhonize the whole LDAP with the DB."""
for u in User.objects.all():
u.ldap_sync()
synchronise_user(sender=User, instance=u)
for lr in ListRight.objects.all():
lr.ldap_sync()
synchronise_usergroup(sender=ListRight, instance=lr)
class Command(BaseCommand):
......
# Copyright © 2017 Gabriel Détraz
# Copyright © 2017 Lara Kermarec
# Copyright © 2017 Augustin Lemesle
# Copyright © 2020 Hugo Levy-Falk
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -19,6 +20,7 @@
from django.core.management.base import BaseCommand, CommandError
from users.models import User
from ldap_sync.models import synchronise_user
class Command(BaseCommand):
......@@ -36,5 +38,5 @@ class Command(BaseCommand):
)
def handle(self, *args, **options):
for usr in User.objects.all():
usr.ldap_sync(mac_refresh=options["full"])
for user in User.objects.all():
synchronise_user(sender=User, instance=user)
# -*- coding: utf-8 -*-
# Generated by Django 1.11.29 on 2021-01-10 16:59
from __future__ import unicode_literals
from django.db import migrations
#from django.conf import settings
import ldapdb.models.fields
#from ldap_sync.management.commands.ldap_rebuild import flush_ldap, sync_ldap
#def rebuild_ldap(apps, schema_editor):
# usersdn = settings.LDAP["base_user_dn"]
# groupsdn = settings.LDAP["base_usergroup_dn"]
# binddn = settings.DATABASES["ldap"]["USER"]
# bindpass = settings.DATABASES["ldap"]["PASSWORD"]
# server = settings.DATABASES["ldap"]["NAME"]
# flush_ldap(binddn, bindpass, server, usersdn, groupsdn)
class Migration(migrations.Migration):
initial = True
dependencies = [
('users', '0004_auto_20210110_1811')
]
operations = [
migrations.CreateModel(
name='LdapServiceUser',
fields=[
('dn', ldapdb.models.fields.CharField(max_length=200, serialize=False)),
('name', ldapdb.models.fields.CharField(db_column='cn', max_length=200, primary_key=True, serialize=False)),
('user_password', ldapdb.models.fields.CharField(blank=True, db_column='userPassword', max_length=200, null=True)),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='LdapServiceUserGroup',
fields=[
('dn', ldapdb.models.fields.CharField(max_length=200, serialize=False)),
('name', ldapdb.models.fields.CharField(db_column='cn', max_length=200, primary_key=True, serialize=False)),
('members', ldapdb.models.fields.ListField(blank=True, db_column='member')),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='LdapUser',
fields=[
('dn', ldapdb.models.fields.CharField(max_length=200, serialize=False)),
('gid', ldapdb.models.fields.IntegerField(db_column='gidNumber')),
('name', ldapdb.models.fields.CharField(db_column='cn', max_length=200, primary_key=True, serialize=False)),
('uid', ldapdb.models.fields.CharField(db_column='uid', max_length=200)),
('uidNumber', ldapdb.models.fields.IntegerField(db_column='uidNumber', unique=True)),
('sn', ldapdb.models.fields.CharField(db_column='sn', max_length=200)),
('login_shell', ldapdb.models.fields.CharField(blank=True, db_column='loginShell', max_length=200, null=True)),
('mail', ldapdb.models.fields.CharField(db_column='mail', max_length=200)),
('given_name', ldapdb.models.fields.CharField(db_column='givenName', max_length=200)),
('home_directory', ldapdb.models.fields.CharField(db_column='homeDirectory', max_length=200)),
('display_name', ldapdb.models.fields.CharField(blank=True, db_column='displayName', max_length=200, null=True)),
('dialupAccess', ldapdb.models.fields.CharField(db_column='dialupAccess', max_length=200)),
('sambaSID', ldapdb.models.fields.IntegerField(db_column='sambaSID', unique=True)),
('user_password', ldapdb.models.fields.CharField(blank=True, db_column='userPassword', max_length=200, null=True)),
('sambat_nt_password', ldapdb.models.fields.CharField(blank=True, db_column='sambaNTPassword', max_length=200, null=True)),
('macs', ldapdb.models.fields.ListField(blank=True, db_column='radiusCallingStationId', max_length=200, null=True)),
('shadowexpire', ldapdb.models.fields.CharField(blank=True, db_column='shadowExpire', max_length=200, null=True)),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='LdapUserGroup',
fields=[
('dn', ldapdb.models.fields.CharField(max_length=200, serialize=False)),
('gid', ldapdb.models.fields.IntegerField(db_column='gidNumber')),
('members', ldapdb.models.fields.ListField(blank=True, db_column='memberUid')),
('name', ldapdb.models.fields.CharField(db_column='cn', max_length=200, primary_key=True, serialize=False)),
],
options={
'abstract': False,
},
),
migrations.AlterField(
model_name='ldapserviceuser',
name='dn',
field=ldapdb.models.fields.CharField(max_length=200, primary_key=True, serialize=False),
),
migrations.AlterField(
model_name='ldapserviceusergroup',
name='dn',
field=ldapdb.models.fields.CharField(max_length=200, primary_key=True, serialize=False),
),
migrations.AlterField(
model_name='ldapuser',
name='dn',
field=ldapdb.models.fields.CharField(max_length=200, primary_key=True, serialize=False),
),
migrations.AlterField(
model_name='ldapusergroup',
name='dn',
field=ldapdb.models.fields.CharField(max_length=200, primary_key=True, serialize=False),
),
]
import sys
from django.db import models
from django.conf import settings
from django.dispatch import receiver
from django.contrib.auth.models import Group
import ldapdb.models
import ldapdb.models.fields
import users.signals
import users.models
import machines.models
class LdapUser(ldapdb.models.Model):
"""A class representing a LdapUser in LDAP, its LDAP conterpart.
Synced from re2o django User model, (User django models),
with a copy of its attributes/fields into LDAP, so this class is a mirror
of the classic django User model.
The basedn userdn is specified in settings.
Attributes:
name: The name of this User
uid: The uid (login) for the unix user
uidNumber: Linux uid number
gid: The default gid number for this user
sn: The user "str" pseudo
login_shell: Linux shell for the user
mail: Email address contact for this user
display_name: Pretty display name for this user
dialupAccess: Boolean, True for valid membership
sambaSID: Identical id as uidNumber
user_password: SSHA hashed password of user
samba_nt_password: NTLM hashed password of user
macs: Multivalued mac address
shadowexpire: Set it to 0 to block access for this user and disabled
account
"""
# LDAP meta-data
base_dn = settings.LDAP["base_user_dn"]
object_classes = [
"inetOrgPerson",
"top",
"posixAccount",
"sambaSamAccount",
"radiusprofile",
"shadowAccount",
]
# attributes
gid = ldapdb.models.fields.IntegerField(db_column="gidNumber")
name = ldapdb.models.fields.CharField(
db_column="cn", max_length=200, primary_key=True
)
uid = ldapdb.models.fields.CharField(db_column="uid", max_length=200)
uidNumber = ldapdb.models.fields.IntegerField(db_column="uidNumber", unique=True)
sn = ldapdb.models.fields.CharField(db_column="sn", max_length=200)
login_shell = ldapdb.models.fields.CharField(
db_column="loginShell", max_length=200, blank=True, null=True
)
mail = ldapdb.models.fields.CharField(db_column="mail", max_length=200)
given_name = ldapdb.models.fields.CharField(db_column="givenName", max_length=200)
home_directory = ldapdb.models.fields.CharField(
db_column="homeDirectory", max_length=200
)
display_name = ldapdb.models.fields.CharField(
db_column="displayName", max_length=200, blank=True, null=True
)
dialupAccess = ldapdb.models.fields.CharField(db_column="dialupAccess")
sambaSID = ldapdb.models.fields.IntegerField(db_column="sambaSID", unique=True)
user_password = ldapdb.models.fields.CharField(
db_column="userPassword", max_length=200, blank=True, null=True
)
sambat_nt_password = ldapdb.models.fields.CharField(
db_column="sambaNTPassword", max_length=200, blank=True, null=True
)
macs = ldapdb.models.fields.ListField(
db_column="radiusCallingStationId", max_length=200, blank=True, null=True
)
shadowexpire = ldapdb.models.fields.CharField(
db_column="shadowExpire", blank=True, null=True
)
def __str__(self):
return self.name
def __unicode__(self):
return self.name
def save(self, *args, **kwargs):
self.sn = self.name
self.uid = self.name
self.sambaSID = self.uidNumber
super(LdapUser, self).save(*args, **kwargs)
@receiver(users.signals.synchronise, sender=users.models.User)
def synchronise_user(sender, **kwargs):
"""
Synchronise an User to the LDAP.
Args:
* sender : The model class.
* instance : The actual instance being synchronised.
* base : Default `True`. When `True`, synchronise basic attributes.
* access_refresh : Default `True`. When `True`, synchronise the access time.
* mac_refresh : Default `True`. When True, synchronise the list of mac addresses.
* group_refresh: Default `False`. When `True` synchronise the groups of the instance.
"""
base=kwargs.get('base', True)
access_refresh=kwargs.get('access_refresh', True)
mac_refresh=kwargs.get('mac_refresh', True )
group_refresh=kwargs.get('group_refresh', False)
user=kwargs["instance"]
if sys.version_info[0] >= 3 and (
user.state == user.STATE_ACTIVE
or user.state == user.STATE_ARCHIVE
or user.state == user.STATE_DISABLED
):
user.refresh_from_db()
try:
user_ldap = LdapUser.objects.get(uidNumber=user.uid_number)
except LdapUser.DoesNotExist:
user_ldap = LdapUser(uidNumber=user.uid_number)
base = True
access_refresh = True
mac_refresh = True
if base:
user_ldap.name = user.pseudo
user_ldap.sn = user.pseudo
user_ldap.dialupAccess = str(user.has_access())
user_ldap.home_directory = user.home_directory
user_ldap.mail = user.get_mail
user_ldap.given_name = (
user.surname.lower() + "_" + user.name.lower()[:3]
)
user_ldap.gid = settings.LDAP["user_gid"]
if "{SSHA}" in user.password or "{SMD5}" in user.password:
# We remove the extra $ added at import from ldap
user_ldap.user_password = user.password[:6] + user.password[7:]
elif "{crypt}" in user.password:
# depending on the length, we need to remove or not a $
if len(user.password) == 41:
user_ldap.user_password = user.password
else:
user_ldap.user_password = user.password[:7] + user.password[8:]
user_ldap.sambat_nt_password = user.pwd_ntlm.upper()
if user.get_shell:
user_ldap.login_shell = str(user.get_shell)
user_ldap.shadowexpire = user.get_shadow_expire
if access_refresh:
user_ldap.dialupAccess = str(user.has_access())
if mac_refresh:
user_ldap.macs = [
str(mac)
for mac in machines.models.Interface.objects.filter(machine__user=user)
.values_list("mac_address", flat=True)
.distinct()
]
if group_refresh:
# Need to refresh all groups because we don't know which groups
# were updated during edition of groups and the user may no longer
# be part of the updated group (case of group removal)
for group in Group.objects.all():
if hasattr(group, "listright"):
synchronise_usergroup(users.models.ListRight, instance=group.listright)
user_ldap.save()
@receiver(users.signals.remove, sender=users.models.User)
def remove_user(sender, **kwargs):
user = kwargs["instance"]
try:
user_ldap = LdapUser.objects.get(name=user.pseudo)
user_ldap.delete()
except LdapUser.DoesNotExist:
pass
@receiver(users.signals.remove_mass, sender=users.models.User)
def remove_users(sender, **kwargs):
queryset_users = kwargs["queryset"]
LdapUser.objects.filter(
name__in=list(queryset_users.values_list("pseudo", flat=True))
).delete()
class LdapUserGroup(ldapdb.models.Model):
"""A class representing a LdapUserGroup in LDAP, its LDAP conterpart.
Synced from UserGroup, (ListRight/Group django models),
with a copy of its attributes/fields into LDAP, so this class is a mirror
of the classic django ListRight model.
The basedn usergroupdn is specified in settings.
Attributes:
name: The name of this LdapUserGroup
gid: The gid number for this unix group
members: Users dn members of this LdapUserGroup
"""
# LDAP meta-data
base_dn = settings.LDAP["base_usergroup_dn"]
object_classes = ["posixGroup"]
# attributes
gid = ldapdb.models.fields.IntegerField(db_column="gidNumber")
members = ldapdb.models.fields.ListField(db_column="memberUid", blank=True)
name = ldapdb.models.fields.CharField(
db_column="cn", max_length=200, primary_key=True
)
def __str__(self):
return self.name
@receiver(users.signals.synchronise, sender=users.models.ListRight)
def synchronise_usergroup(sender, **kwargs):
group = kwargs["instance"]
try:
group_ldap = LdapUserGroup.objects.get(gid=group.gid)
except LdapUserGroup.DoesNotExist:
group_ldap = LdapUserGroup(gid=group.gid)
group_ldap.name = group.unix_name
group_ldap.members = [user.pseudo for user in group.user_set.all()]
group_ldap.save()
@receiver(users.signals.remove, sender=users.models.ListRight)
def remove_usergroup(sender, **kwargs):
group = kwargs["instance"]
try:
group_ldap = LdapUserGroup.objects.get(gid=group.gid)
group_ldap.delete()
except LdapUserGroup.DoesNotExist:
pass
class LdapServiceUser(ldapdb.models.Model):
"""A class representing a ServiceUser in LDAP, its LDAP conterpart.
Synced from ServiceUser, with a copy of its attributes/fields into LDAP,
so this class is a mirror of the classic django ServiceUser model.
The basedn userservicedn is specified in settings.
Attributes:
name: The name of this ServiceUser
user_password: The SSHA hashed password of this ServiceUser
"""
# LDAP meta-data
base_dn = settings.LDAP["base_userservice_dn"]
object_classes = ["applicationProcess", "simpleSecurityObject"]
# attributes
name = ldapdb.models.fields.CharField(
db_column="cn", max_length=200, primary_key=True
)
user_password = ldapdb.models.fields.CharField(
db_column="userPassword", max_length=200, blank=True, null=True
)
def __str__(self):
return self.name
def synchronise_serviceuser_group(serviceuser):
try:
group = LdapServiceUserGroup.objects.get(name=serviceuser.access_group)
except:
group = LdapServiceUserGroup(name=serviceuser.access_group)
group.members = list(
LdapServiceUser.objects.filter(
name__in=[
user.pseudo
for user in users.models.ServiceUser.objects.filter(
access_group=serviceuser.access_group
)
]
).values_list("dn", flat=True)
)
group.save()
@receiver(users.signals.synchronise, sender=users.models.ServiceUser)
def synchronise_serviceuser(sender, **kwargs):
user = kwargs["instance"]
try:
user_ldap = LdapServiceUser.objects.get(name=user.pseudo)
except LdapServiceUser.DoesNotExist:
user_ldap = LdapServiceUser(name=user.pseudo)
user_ldap.user_password = user.password[:6] + user.password[7:]
user_ldap.save()
synchronise_serviceuser_group(user)
@receiver(users.signals.remove, sender=users.models.ServiceUser)
def remove_serviceuser(sender, **kwargs):
user = kwargs["instance"]
try:
user_ldap = LdapServiceUser.objects.get(name=user.pseudo)
user_ldap.delete()
except LdapUser.DoesNotExist:
pass
synchronise_serviceuser_group(user)
class LdapServiceUserGroup(ldapdb.models.Model):
"""A class representing a ServiceUserGroup in LDAP, its LDAP conterpart.
Synced from ServiceUserGroup, with a copy of its attributes/fields into LDAP,
so this class is a mirror of the classic django ServiceUserGroup model.
The basedn userservicegroupdn is specified in settings.
Attributes:
name: The name of this ServiceUserGroup
members: ServiceUsers dn members of this ServiceUserGroup
"""
# LDAP meta-data
base_dn = settings.LDAP["base_userservicegroup_dn"]
object_classes = ["groupOfNames"]
# attributes
name = ldapdb.models.fields.CharField(
db_column="cn", max_length=200, primary_key=True
)
members = ldapdb.models.fields.ListField(db_column="member", blank=True)
def __str__(self):
return self.name
from django.test import TestCase
# Create your tests here.
from django.conf.urls import url
from .import views
urlpatterns = []
from django.shortcuts import render
# Create your views here.
......@@ -79,12 +79,12 @@ def context_optionnal_apps(request):
optionnal_templates_navbar_user_list = [
app.views.navbar_user()
for app in optionnal_apps
if hasattr(app.views, "navbar_user")
if hasattr(app, "views") and hasattr(app.views, "navbar_user")
]
optionnal_templates_navbar_logout_list = [
app.views.navbar_logout()
for app in optionnal_apps
if hasattr(app.views, "navbar_logout")
if hasattr(app, "views") and hasattr(app.views, "navbar_logout")
]
return {
"optionnal_templates_navbar_user_list": optionnal_templates_navbar_user_list,
......