Commit 504e87c9 authored by KangMin An's avatar KangMin An
Browse files

Create & Update : 데이터 처리를 위한 코드 생성 및 수정.

parent 63228a5b
......@@ -7,3 +7,6 @@ package-lock.json
# Project Data
/data
# Python Cache
__pycache__
......@@ -55,7 +55,7 @@
# 3. Data Processing
EUE가 제일 중요하게 수행해야할 부분입니다. 데이터를 학습하고 예측 값을 반환합니다.
EUE가 제일 중요하게 수행해야할 부분입니다. 데이터에 대해 선형회귀 분석을 진행합니다. 이 결과를 바탕으로 단위 시간 후의 온도를 예측해봅니다.
## Input Data
......@@ -64,7 +64,6 @@ EUE가 제일 중요하게 수행해야할 부분입니다. 데이터를 학습
- 월 ( Month )
- 일 ( Date )
- 시 ( Hour )
- 분 ( Minute )
- 외부 데이터
......@@ -90,4 +89,4 @@ EUE가 제일 중요하게 수행해야할 부분입니다. 데이터를 학습
[Linear Regression](https://ko.wikipedia.org/wiki/선형_회귀)를 통해서 데이터들의 선형 관계를 파악 후 다음의 온도를 예측해보려 합니다.
훈련 데이터는 최근 7일 간의 데이터를 사용합니다. 훈련을 통해 생성된 가중치들은 데이터들과 마찬가지로 CSV형식의 독립적인 파일로 생성해 저장합니다. 저장된 가중치는 다음 훈련의 초기값으로 사용됩니다.
매일 자정(Day K) 데이터 처리 과정이 진행 됩니다. 따라서 (Day K - 1)의 데이터들과 (Day K - 1)까지 사용된 가중치 데이터들을 이용해 Linear Regression을 진행합니다. 데이터 처리 과정이 진행된 후의 가중치들은 (Day K)의 가중치 파일로 생성되어 저장됩니다.
......@@ -28,6 +28,7 @@
"morgan": "^1.10.0",
"mysql2": "^2.2.5",
"node-fetch": "^2.6.1",
"node-schedule": "^2.0.0",
"pug": "^3.0.0"
},
"devDependencies": {
......
"""
# main.py
- Load된 데이터들에 대해 Linear Regression을 진행합니다.
- 진행된 후의 Weights를 파일로 저장합니다.
"""
import sys
import pymysql
from preprocessing import preprocessingData
def getUsersDataLinks(dbconfig):
eue_db = pymysql.connect(user=dbconfig["user"], password=dbconfig["password"],
host=dbconfig["host"], db=dbconfig["database"], charset='utf8')
cursor = eue_db.cursor(pymysql.cursors.DictCursor)
query = "SELECT ID,DATALINK FROM USER;"
cursor.execute(query)
result = cursor.fetchall()
return result
dbconfig = {"host": sys.argv[1], "user": sys.argv[2],
"password": sys.argv[3], "database": sys.argv[4]}
users = getUsersDataLinks(dbconfig)
for userdata in users:
# Get Data links
# ./data/DO/SGG/EMD/Users/ID
user_datalink = userdata["DATALINK"]
dir_ls = user_datalink.split("/")
# ./data/DO/SGG/EMD/Outside
outside_datalink = ("/").join(dir_ls[:-2]) + "/Outside"
# data load
train_x, train_t = preprocessingData(user_datalink, outside_datalink)
# linear regression
pass
'''
# initialize.py
- Data 전처리를 목적으로 하는 파일입니다.
'''
import os
import datetime
import csv
import numpy as np
def loadRawData(link):
'''
# CSV 파일의 내용을 반환하는 함수
- 어제 하루 기록된 파일들에 대해 진행하기 위해 날짜 정보를 생성합니다.
- 제공 받은 링크를 통해 파일을 읽고 반환합니다.
'''
raw_data = []
today = datetime.datetime.today()
yesterday = today - datetime.timedelta(days=1)
yMonth = yesterday.month if yesterday.month >= 10 else "0" + \
str(yesterday.month)
yDay = yesterday.day if yesterday.day >= 10 else "0"+str(yesterday.day)
time_dir = "/" + str(yesterday.year) + "/" + \
str(yesterday.year) + str(yMonth) + "/" + \
str(yesterday.year) + str(yMonth) + str(yDay)
weather_dir = os.getcwd() + "/server" + link + time_dir + "/weather.csv"
data_file = open(weather_dir, 'r', newline='')
csv_data = csv.reader(data_file)
for line in csv_data:
raw_data.append(line)
data_file.close()
return raw_data
def handleUserRawData(user_data):
'''
# User Raw Data (CSV 파일 데이터) 가공 함수
- [ 월 / 일 / 시 / 분 / 온도 / 습도 / 광도 ]의 데이터를 변환하는 함수
- 월 / 일 / 분 제거
- test_data(분이 제거된 데이터)와 true_data(단위 시간 후 실제 온도)로 나누기
'''
user_x = []
train_t = []
isFirstLine = True
for line in user_data:
_, _, hour, _, temp, humi, lights = line
user_x.append([int(hour), float(temp), float(humi), float(lights)])
if isFirstLine:
isFirstLine = False
else:
train_t.append([float(temp)])
train_t.append(train_t[-1])
return (user_x, train_t)
def handleOutRawData(out_data):
'''
# Out Raw Data (CSV 파일 데이터) 가공 함수
- [ 월 / 일 / 시 / 분 / 온도 / 습도 / 기압 / 풍속 ] 데이터를 변환하는 함수
- '분' 을 제거합니다.
- 같은 시각의 데이터들은 평균을 구해서 데이터로 저장합니다.
- 외부 데이터는 Dictionary Data로 최종 반환됩니다.
- Dictionary의 Key는 '시'가 됩니다.
'''
out_dict = {}
key = None
counter = 1
sum_temp, sum_humi, sum_pressure, sum_wind_speed = 0, 0, 0, 0
for line in out_data:
month, day, hour, _, temp, humi, pressure, wind_speed = line
if key == None:
key = int(hour)
counter = 1
sum_temp, sum_humi, sum_pressure, sum_wind_speed = float(
temp), float(humi), float(pressure), float(wind_speed)
if key == hour:
counter += 1
sum_temp += float(temp)
sum_humi += float(humi)
sum_pressure += float(pressure)
sum_wind_speed += float(wind_speed)
else:
out_dict[key] = [int(month), int(day), key, sum_temp/counter, sum_humi /
counter, sum_pressure/counter, sum_wind_speed/counter]
key = int(hour)
counter = 1
sum_temp, sum_humi, sum_pressure, sum_wind_speed = float(
temp), float(humi), float(pressure), float(wind_speed)
return out_dict
def combineXdata(user_x, out_dict):
'''
# 분리된 입력 데이터를 합치는 함수
- 사용자 데이터와 외부 데이터를 결합해 입력층의 값으로 가공합니다.
'''
train_x = []
for line in user_x:
hour, temp, humi, lights = line
x = out_dict[hour] + [temp, humi, lights]
train_x.append(x)
return train_x
def preprocessingData(user_link, out_link):
'''
# 데이터 분석 전 데이터 전처리 함수입니다.
1. 데이터 로드
2. 데이터 1차 가공 (handle~RawData)
3. 데이터 2차 가공 (combineXdata)
4. 데이터 넘파이 형식 배열로 변환
5. 반환
'''
raw_user_data = loadRawData(user_link)
raw_out_data = loadRawData(out_link)
user_x, train_t = handleUserRawData(raw_user_data)
out_dict = handleOutRawData(raw_out_data)
train_x = combineXdata(user_x, out_dict)
train_x = np.array(train_x)
train_t = np.array(train_t)
return train_x, train_t
import app from "./app";
import dotenv from "dotenv";
import "./schedules"; // 매일 자정 데이터 처리
dotenv.config();
const PORT = process.env.PORT || 4500;
......
import schedule from "node-schedule";
import { spawn } from "child_process";
import dotenv from "dotenv";
dotenv.config();
// Data Processing Python Codes Directory - server directory에서 실행
const DATA_PROCESSING_DIR = "./src/data_processing/main.py";
// 매일 자정에 실행할 스케줄의 규칙
const rule = new schedule.RecurrenceRule();
rule.hour = 0;
rule.minute = 0;
rule.second = 0;
// 매일 자정에 실행되는 데이터 처리 스케줄
const dataProcessingJob = schedule.scheduleJob(rule, () => {
const today = new Date();
console.log(
`${today.getFullYear()}.${
today.getMonth() + 1
}.${today.getDate()} - Data Processing Start.`
);
const pyprocess = spawn("python", [
DATA_PROCESSING_DIR,
process.env.MYSQL_HOST,
process.env.MYSQL_USER,
process.env.MYSQL_PASSWORD,
process.env.MYSQL_DATABASE,
]);
pyprocess.stdout.on("data", (data) => {
console.log("Data processing is start.");
});
pyprocess.stderr.on("data", (data) => {
console.log("Error in the data processing.");
});
pyprocess.on("close", () => {
console.log("The data processing done.");
});
});
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment