-
Notifications
You must be signed in to change notification settings - Fork 0
/
rforest.py
128 lines (104 loc) · 4.61 KB
/
rforest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import numpy as np
class DecisionTreeNode:
def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
self.feature = feature
self.threshold = threshold
self.left = left
self.right = right
self.value = value
class DecisionTree:
def __init__(self, max_depth=5):
self.max_depth = max_depth
def fit(self, X, y):
self.root = self._fit(X, y, depth=0)
def _fit(self, X, y, depth):
if len(set(y)) == 1:
return DecisionTreeNode(value=y[0])
if depth >= self.max_depth:
return DecisionTreeNode(value=self._most_common_label(y))
num_samples, num_features = X.shape
best_split = self._best_split(X, y)
if best_split is None:
return DecisionTreeNode(value=self._most_common_label(y))
left_indices = X[:, best_split['feature']] <= best_split['threshold']
right_indices = X[:, best_split['feature']] > best_split['threshold']
left_node = self._fit(X[left_indices], y[left_indices], depth + 1)
right_node = self._fit(X[right_indices], y[right_indices], depth + 1)
return DecisionTreeNode(feature=best_split['feature'],
threshold=best_split['threshold'],
left=left_node,
right=right_node)
def _best_split(self, X, y):
num_samples, num_features = X.shape
best_gini = float('inf')
best_split = None
for feature in range(num_features):
feature_values = np.unique(X[:, feature])
for threshold in feature_values:
left_indices = X[:, feature] <= threshold
right_indices = X[:, feature] > threshold
if len(left_indices) == 0 or len(right_indices) == 0:
continue
gini = self._gini_index(y[left_indices], y[right_indices])
if gini < best_gini:
best_gini = gini
best_split = {'feature': feature, 'threshold': threshold}
return best_split
def _gini_index(self, left_labels, right_labels):
left_size = len(left_labels)
right_size = len(right_labels)
total_size = left_size + right_size
left_gini = 1.0 - sum((np.sum(left_labels == label) / left_size) ** 2 for label in np.unique(left_labels))
right_gini = 1.0 - sum((np.sum(right_labels == label) / right_size) ** 2 for label in np.unique(right_labels))
return (left_size / total_size) * left_gini + (right_size / total_size) * right_gini
def _most_common_label(self, y):
return np.bincount(y).argmax()
def predict(self, X):
return np.array([self._predict(sample, self.root) for sample in X])
def _predict(self, sample, node):
if node.value is not None:
return node.value
if sample[node.feature] <= node.threshold:
return self._predict(sample, node.left)
else:
return self._predict(sample, node.right)
class RandomForest:
def __init__(self, n_estimators=10, max_depth=5, max_features=None):
self.n_estimators = n_estimators
self.max_depth = max_depth
self.max_features = max_features
def fit(self, X, y):
self.trees = []
num_features = X.shape[1]
if self.max_features is None:
self.max_features = num_features
for _ in range(self.n_estimators):
# Bootstrap sampling
indices = np.random.choice(len(X), len(X), replace=True)
X_bootstrap = X[indices]
y_bootstrap = y[indices]
# Train decision tree
tree = DecisionTree(max_depth=self.max_depth)
tree.fit(X_bootstrap, y_bootstrap)
self.trees.append(tree)
def predict(self, X):
# Collect predictions from each tree
tree_predictions = np.array([tree.predict(X) for tree in self.trees])
# Majority vote
return np.array([np.bincount(predictions).argmax() for predictions in tree_predictions.T])
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Load Iris dataset
iris = load_iris()
X, y = iris.data, iris.target
# Split dataset into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# Train Random Forest
rf = RandomForest(n_estimators=10, max_depth=5)
rf.fit(X_train, y_train)
# Make predictions
y_pred = rf.predict(X_test)
# Evaluate accuracy
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.2f}")