Commit ce6e563c authored by Patrik Dufresne's avatar Patrik Dufresne

Initial project structure

parents
*.py[co]
# Packages
.eggs
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
#Mr Developer
.mr.developer.cfg
.project
.externalToolBuilders
.settings
/ez_setup
.pydevproject
coverage.xml
nosetests.xml
/sonar-project.properties
/.eggs/
This project hold all the core components of Open Source Event Correlation (OSEC).
TODO
## Alarm Modeling
Reference: https://www.itu.int/rec/T-REC-X.733-199202-I/
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Patrik Dufresne Service Logiciel inc. All rights reserved.
# Patrik Dufresne Service Logiciel PROPRIETARY/CONFIDENTIAL.
# Use is subject to license terms.
import argparse
import os
import osec.rest
def main():
parser = argparse.ArgumentParser(description='Open Source Event Correlation')
parser.add_argument('action', metavar='ACTION', type=str, nargs='?',
help='Define the components to be started: rest, enrich')
# Rest
parser.add_argument('--host', default=os.environ.get('OSEC_HOST', '127.0.0.1'),
help='Host name to listen for RESTful API. Default to: 127.0.0.1. OSEC_HOST')
parser.add_argument('--port', type=int, default=os.environ.get('OSEC_PORT', '8081'),
help='Port to listen for RESTful API. Default to: 8081 OSEC_PORT')
parser.add_argument('--secret', default=os.environ.get('OSEC_SECRET', 'osec'),
help='Secret used as a password for the RESTful API. Default: osec. OSEC_SECRET')
args = parser.parse_args()
if args.action == 'rest':
osec.rest.run(args.host, args.port, args.secret)
elif args.action == 'enrich':
pass
else:
print('unknown action: %s' % args.action)
exit(1)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Patrik Dufresne Service Logiciel inc. All rights reserved.
# Patrik Dufresne Service Logiciel PROPRIETARY/CONFIDENTIAL.
# Use is subject to license terms.
"""
Created on Sep 26, 2018
@author: patrik dufresne
"""
from osec import main
if __name__ == '__main__': # Script executed directly?
main()
'''
Created on Sep 26, 2018
@author: ikus060
'''
class Alarm(dict):
'''
Representation of an alarm.
'''
def __init__(self):
'''
Constructor
'''
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Patrik Dufresne Service Logiciel inc. All rights reserved.
# Patrik Dufresne Service Logiciel PROPRIETARY/CONFIDENTIAL.
# Use is subject to license terms.
'''
Created on Sep 26, 2018
@author: ikus060
'''
import os
import cherrypy
from kafka.producer import KafkaProducer
from osec.rest.alarms import Alarms
USERS = {'osec': os.environ.get('OSEC_SECRET', 'osec')}
# TODO Must allow creation of users.
def validate_password(realm, username, password):
if username in USERS and USERS[username] == password:
return True
return False
@cherrypy.tools.auth_basic(realm='osec-api', checkpassword=validate_password)
class Root(object):
alarms = Alarms()
@cherrypy.expose
def index(self):
return 'OSEC RESTful API'
def run(host, port, secret):
cherrypy.config.update({
'server.socket_host': host,
'server.socket_port': port,
})
cherrypy.quickstart(Root(), '/')
'''
Created on Sep 26, 2018
@author: ikus060
'''
import cherrypy
@cherrypy.expose
class Alarms(object):
@cherrypy.expose
@cherrypy.tools.json_out()
@cherrypy.tools.json_in()
def index(self):
if cherrypy.request.method in ['PUT', 'POST']:
return self.alarms_post()
else:
return self.alarms_get()
def alarms_get(self):
return ""
def alarms_post(self):
cherrypy.log('post alarms')
return ""
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Patrik Dufresne Service Logiciel inc. All rights reserved.
# Patrik Dufresne Service Logiciel PROPRIETARY/CONFIDENTIAL.
# Use is subject to license terms.
'''
Created on Sep 26, 2018
@author: ikus060
'''
from base64 import b64encode
import unittest
import cherrypy
from cherrypy.test import helper
from osec.alarm import Alarm
import osec.rest
import json
class TestRest(helper.CPWebCase):
def setup_server():
cherrypy.tree.mount(osec.rest.Root())
setup_server = staticmethod(setup_server)
interactive = False
def test_alarm_post(self):
a = Alarm()
a['identifier'] = "A::B::C"
# Make the query
body = json.dumps(a)
headers = [
("Authorization", "Basic " + b64encode(b"osec:osec").decode('ascii')),
("Content-Type", "application/json"),
("Content-Length", str(len(body)))
]
self.getPage('/alarms/', method='POST', headers=headers, body=body)
self.assertStatus(200)
pass
if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName']
unittest.main()
[nosetests]
with-xunit=1
with-coverage=1
cover-erase=1
cover-xml=1
cover-inclusive=1
cover-package=osec
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Patrik Dufresne Service Logiciel inc. All rights reserved.
# Patrik Dufresne Service Logiciel PROPRIETARY/CONFIDENTIAL.
# Use is subject to license terms.
from __future__ import print_function
import setuptools
setuptools.setup(
name="osec",
use_scm_version={"root": "..", "relative_to": __file__},
description='Open Source Event Correlation',
long_description='TODO',
author='Patrik Dufresne Service Logiciel inc.',
url='http://www.patrikdufresne.com/',
packages=['osec'],
include_package_data=True,
python_requires='>=3.6',
setup_requires=[
"setuptools_scm",
],
install_requires=[
"kafka-python>=1.4.3",
"cherrypy",
],
# requirement for testing
tests_require=[
"mock>=1.3.0",
"pytest",
"nose",
],
entry_points={"console_scripts": ["osec = osec:run"], },
)
# Copyright (C) 2018 Patrik Dufresne Service Logiciel inc. All rights reserved.
# Patrik Dufresne Service Logiciel PROPRIETARY/CONFIDENTIAL.
# Use is subject to license terms.
[tox]
envlist = py36
[testenv]
deps=
nose
coverage
commands=python setup.py nosetests --xunit-file=nosetests-{envname}.xml --xunit-testsuite-name={envname} --cover-xml-file=coverage-{envname}.xml
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment