Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move import match to base #8

Merged
merged 4 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 53 additions & 53 deletions spp_import_match/models/base.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,53 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
import re
from odoo import api, models

class Base(models.AbstractModel):
_inherit = "base"

@api.model
def load(self, fields, data):
usable, field_to_match = self.env["spp.import.match"]._usable_rules(self._name, fields)
if usable:
newdata = list()
if ".id" in fields:
column = fields.index(".id")
fields[column] = "id"
for values in data:
dbid = int(values[column])
values[column] = self.browse(dbid).get_external_id().get(dbid)
import_fields = list(map(models.fix_import_export_id_paths, fields))
converted_data = self._convert_records(
self._extract_records(import_fields, data)
)

if "id" not in fields:
fields.append("id")
import_fields.append(["id"])

clean_fields = [f[0] for f in import_fields]
for dbid, xmlid, record, info in converted_data:
row = dict(zip(clean_fields, data[info["record"]]))

match = self
if xmlid:
row["id"] = xmlid
newdata.append(tuple(row[f] for f in clean_fields))
continue
elif dbid:
match = self.browse(dbid)
else:
match = self.env["spp.import.match"]._match_find(self, record, row)

flat_fields_to_remove = [item for sublist in field_to_match for item in sublist]
for fields_pop in flat_fields_to_remove:
if fields_pop in fields:
fields.remove(fields_pop)

match.export_data(fields)

ext_id = match.get_external_id()
row["id"] = ext_id[match.id] if match else row.get("id", "")
newdata.append(tuple(row[f] for f in fields))
data = newdata
return super(Base, self).load(fields, data)
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
import re
from odoo import api, models
class Base(models.AbstractModel):
_inherit = "base"
@api.model
def load(self, fields, data):
usable, field_to_match = self.env["spp.import.match"]._usable_rules(self._name, fields)
if usable:
newdata = list()
if ".id" in fields:
column = fields.index(".id")
fields[column] = "id"
for values in data:
dbid = int(values[column])
values[column] = self.browse(dbid).get_external_id().get(dbid)
import_fields = list(map(models.fix_import_export_id_paths, fields))
converted_data = self._convert_records(
self._extract_records(import_fields, data)
)
if "id" not in fields:
fields.append("id")
import_fields.append(["id"])
clean_fields = [f[0] for f in import_fields]
for dbid, xmlid, record, info in converted_data:
row = dict(zip(clean_fields, data[info["record"]]))
match = self
if xmlid:
row["id"] = xmlid
newdata.append(tuple(row[f] for f in clean_fields))
continue
elif dbid:
match = self.browse(dbid)
else:
match = self.env["spp.import.match"]._match_find(self, record, row)
flat_fields_to_remove = [item for sublist in field_to_match for item in sublist]
for fields_pop in flat_fields_to_remove:
if fields_pop in fields:
fields.remove(fields_pop)
match.export_data(fields)
ext_id = match.get_external_id()
row["id"] = ext_id[match.id] if match else row.get("id", "")
newdata.append(tuple(row[f] for f in fields))
data = newdata
return super(Base, self).load(fields, data)
275 changes: 149 additions & 126 deletions spp_import_match/models/import_match.py
Original file line number Diff line number Diff line change
@@ -1,126 +1,149 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.

from odoo import api, fields, models, tools


class SPPImportMatch(models.Model):
_name = "spp.import.match"
_description = "Import Matching"
_order = "sequence, name"

name = fields.Char(compute="_compute_name", store=True, index=True)
sequence = fields.Integer(index=True)
model_id = fields.Many2one(
"ir.model",
"Model",
required=True,
ondelete="cascade",
domain=[("transient", "=", False)],
help="Model for Import Matching",
)
model_name = fields.Char(related="model_id.model")
model_description = fields.Char(related="model_id.name")
field_ids = fields.One2many(
"spp.import.match.fields",
"match_id",
string="Fields",
required=True,
help="Fields to Match in Importing",
)

@api.onchange("model_id")
def _onchange_model_id(self):
for rec in self:
rec.field_ids = None

@api.depends("model_id")
def _compute_name(self):
for rec in self:
name = "New"
if rec.model_id:
name = rec.model_description

rec.name = name

@api.model
def _match_find(self, model, converted_row, imported_row):
usable, field_to_match = self._usable_rules(model._name, converted_row)
usable = self.browse(usable)
for combination in usable:
combination_valid = True
domain = list()
for field in combination.field_ids:
if field.conditional:
if imported_row[field.name] != field.imported_value:
combination_valid = False
break
if field.field_id.name in converted_row:
row_value = converted_row[field.field_id.name]
field_value = field.field_id.name
add_to_domain = True
if field.sub_field_id:
tuple_val = row_value[0][2]
add_to_domain = False
if field.sub_field_id.name in tuple_val:
row_value = tuple_val[field.sub_field_id.name]
add_to_domain = True
field_value = field.field_id.name + "." + field.sub_field_id.name
if add_to_domain:
domain.append((field_value, "=", row_value))
if not combination_valid:
continue
match = model.search(domain)
if len(match) == 1:
return match

return model

@api.model
@tools.ormcache("model_name", "frozenset(fields)")
def _usable_rules(self, model_name, fields):
result = self
available = self.search([("model_name", "=", model_name)])
field_to_match = []
for record in available:
field_to_match.append(record.field_ids.mapped("name"))
for f in record.field_ids:
if f.name in fields or f.field_id.name in fields:
result |= record

return result.ids, field_to_match


class SPPImportMatchFields(models.Model):
_name = "spp.import.match.fields"
_description = "Fields for Import Matching"

name = fields.Char(compute="_compute_name")
field_id = fields.Many2one(
"ir.model.fields",
string="Field",
required=True,
ondelete="cascade",
domain="[('model_id', '=', model_id)]",
help="Fields to Match in Importing",
)
relation = fields.Char(related="field_id.relation")
sub_field_id = fields.Many2one(
"ir.model.fields",
string="Sub-Field",
ondelete="cascade",
help="Sub Fields to Match in Importing",
)
match_id = fields.Many2one("spp.import.match", string="Match", ondelete="cascade")
model_id = fields.Many2one(related="match_id.model_id")
conditional = fields.Boolean()
imported_value = fields.Char(
help="This will be used as a condition to disregard this field if matched"
)

def _compute_name(self):
for rec in self:
name = rec.field_id.name
if rec.sub_field_id:
name = rec.field_id.name + "/" + rec.sub_field_id.name
rec.name = name

# Part of OpenSPP. See LICENSE file for full copyright and licensing details.

import logging
from odoo import _, api, fields, models, tools
from odoo.exceptions import ValidationError

_logger = logging.getLogger(__name__)


class SPPImportMatch(models.Model):
_name = "spp.import.match"
_description = "Import Matching"
_order = "sequence, name"

name = fields.Char(compute="_compute_name", store=True, index=True)
sequence = fields.Integer(index=True)
model_id = fields.Many2one(
"ir.model",
"Model",
required=True,
ondelete="cascade",
domain=[("transient", "=", False)],
help="Model for Import Matching",
)
model_name = fields.Char(related="model_id.model")
model_description = fields.Char(related="model_id.name")
field_ids = fields.One2many(
"spp.import.match.fields",
"match_id",
string="Fields",
required=True,
help="Fields to Match in Importing",
)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_sql_constraints = [
(
"model_id_uniq",
"UNIQUE(model_id)",
"Model should be unique!",
)
]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be removed, as we can have multiple Import Match Configuration for a single model.

@api.onchange("model_id")
def _onchange_model_id(self):
for rec in self:
rec.field_ids = None

@api.depends("model_id")
def _compute_name(self):
for rec in self:
name = "New"
if rec.model_id:
name = rec.model_description

rec.name = name

@api.model
def _match_find(self, model, converted_row, imported_row):
usable, field_to_match = self._usable_rules(model._name, converted_row)
usable = self.browse(usable)
for combination in usable:
combination_valid = True
domain = list()
for field in combination.field_ids:
if field.conditional:
if imported_row[field.name] != field.imported_value:
combination_valid = False
break
if field.field_id.name in converted_row:
row_value = converted_row[field.field_id.name]
field_value = field.field_id.name
add_to_domain = True
if field.sub_field_id:
tuple_val = row_value[0][2]
add_to_domain = False
if field.sub_field_id.name in tuple_val:
row_value = tuple_val[field.sub_field_id.name]
add_to_domain = True
field_value = field.field_id.name + "." + field.sub_field_id.name
if add_to_domain:
domain.append((field_value, "=", row_value))
if not combination_valid:
continue
match = model.search(domain)
if len(match) == 1:
return match

return model

@api.model
@tools.ormcache("model_name", "frozenset(fields)")
def _usable_rules(self, model_name, fields):
result = self
available = self.search([("model_name", "=", model_name)])
field_to_match = []
for record in available:
field_to_match.append(record.field_ids.mapped("name"))
for f in record.field_ids:
if f.name in fields or f.field_id.name in fields:
result |= record

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
available = self.search([("model_name", "=", model_name)])
field_to_match = []
for record in available:
field_to_match.append(record.field_ids.mapped("name"))
for f in record.field_ids:
if f.name in fields or f.field_id.name in fields:
result |= record
available = self.search([("model_name", "=", model_name)], limit=1)
if not available:
return [], []
field_to_match = []
for f in available.field_ids:
if f.name in fields or f.field_id.name in fields:
result |= available
field_to_match.append(f.name)

return result.ids, field_to_match


class SPPImportMatchFields(models.Model):
_name = "spp.import.match.fields"
_description = "Fields for Import Matching"

name = fields.Char(compute="_compute_name")
field_id = fields.Many2one(
"ir.model.fields",
string="Field",
required=True,
ondelete="cascade",
domain="[('model_id', '=', model_id)]",
help="Fields to Match in Importing",
)
relation = fields.Char(related="field_id.relation")
sub_field_id = fields.Many2one(
"ir.model.fields",
string="Sub-Field",
ondelete="cascade",
help="Sub Fields to Match in Importing",
)
match_id = fields.Many2one("spp.import.match", string="Match", ondelete="cascade")
model_id = fields.Many2one(related="match_id.model_id")
conditional = fields.Boolean()
imported_value = fields.Char(
help="This will be used as a condition to disregard this field if matched"
)

def _compute_name(self):
for rec in self:
name = rec.field_id.name
if rec.sub_field_id:
name = rec.field_id.name + "/" + rec.sub_field_id.name
rec.name = name

@api.onchange("field_id")
def _onchange_field_id(self):
for rec in self:
field_id = rec.field_id.id
fields_list = []
for field in rec.match_id.field_ids:
new_id_str = str(field.id)
if not new_id_str.find("NewId_'virtual"):
fields_list.append(field.field_id.id)

duplicate_counter = 0
for duplicate_field in fields_list:
if duplicate_field == field_id:
duplicate_counter += 1

if duplicate_counter > 1:
raise ValidationError(_("Field '%s', already exists!") % rec.field_id.field_description)

Loading
Loading