diff --git a/utility_billing/utility_billing/overrides/server/sales_order.py b/utility_billing/utility_billing/overrides/server/sales_order.py index aa0d898..f55c2ff 100755 --- a/utility_billing/utility_billing/overrides/server/sales_order.py +++ b/utility_billing/utility_billing/overrides/server/sales_order.py @@ -116,6 +116,27 @@ def get_unique_customers_and_orders(source_names): @frappe.whitelist() def make_sales_invoice(source_names, target_doc=None, ignore_permissions=False): """Create Sales Invoice from Sales Orders.""" + source_names = parse_source_names(source_names) + + doclist = [] + for source_name in source_names: + try: + current_doc = map_sales_order_to_invoice( + source_name, target_doc, ignore_permissions + ) + doclist.append(current_doc) + except Exception as e: + log_error_and_continue(source_name, e) + + if doclist: + target_invoice = merge_documents(doclist) + finalize_invoice(target_invoice) + + return target_invoice + + +def parse_source_names(source_names): + """Parse and validate the source_names input.""" if isinstance(source_names, str): source_names = json.loads(source_names) @@ -125,140 +146,146 @@ def make_sales_invoice(source_names, target_doc=None, ignore_permissions=False): "Invalid input type for source_names. Expected JSON array, list, or tuple." ) ) + return source_names + + +def map_sales_order_to_invoice(source_name, target_doc, ignore_permissions): + """Map Sales Order to Sales Invoice.""" + return get_mapped_doc( + "Sales Order", + source_name, + { + "Sales Order": { + "doctype": "Sales Invoice", + "field_map": { + "party_account_currency": "party_account_currency", + "payment_terms_template": "payment_terms_template", + }, + "field_no_map": ["payment_terms_template"], + "validation": {"docstatus": ["=", 1]}, + }, + "Sales Order Item": { + "doctype": "Sales Invoice Item", + "field_map": { + "name": "so_detail", + "parent": "sales_order", + }, + "postprocess": update_item, + "condition": lambda doc: doc.qty + and (doc.base_amount == 0 or abs(doc.billed_amt) < abs(doc.amount)), + }, + "Sales Taxes and Charges": { + "doctype": "Sales Taxes and Charges", + "reset_value": True, + }, + "Sales Team": {"doctype": "Sales Team", "add_if_empty": True}, + "Sales Order Meter Reading": { + "doctype": "Sales Invoice Meter Reading", + "field_map": { + "meter_number": "meter_number", + "item_code": "item_code", + "uom": "uom", + "stock_uom": "stock_uom", + "previous_consumption": "previous_consumption", + "current_reading": "current_reading", + "previous_reading": "previous_reading", + }, + "condition": lambda doc: doc.meter_number is not None, + }, + }, + target_doc, + postprocess, + ignore_permissions=ignore_permissions, + ) - def postprocess(source, target): - set_missing_values(source, target) - if target.get("allocate_advances_automatically"): - target.set_advances() - - def set_missing_values(source, target): - target.flags.ignore_permissions = True - target.run_method("set_missing_values") - target.run_method("set_po_nos") - target.run_method("calculate_taxes_and_totals") - target.run_method("set_use_serial_batch_fields") - if source.company_address: - target.update({"company_address": source.company_address}) - else: - target.update(get_company_address(target.company)) +def postprocess(source, target): + """Postprocess to set missing values and calculate totals.""" + set_missing_values(source, target) + if target.get("allocate_advances_automatically"): + target.set_advances() - if target.company_address: - target.update( - get_fetch_values( - "Sales Invoice", "company_address", target.company_address - ) - ) - if source.loyalty_points and source.order_type == "Shopping Cart": - target.redeem_loyalty_points = 1 - target.loyalty_points = source.loyalty_points +def set_missing_values(source, target): + """Set missing values and calculate totals for the invoice.""" + target.flags.ignore_permissions = True + target.run_method("set_missing_values") + target.run_method("set_po_nos") + target.run_method("calculate_taxes_and_totals") + target.run_method("set_use_serial_batch_fields") - target.debit_to = get_party_account("Customer", source.customer, source.company) + if source.company_address: + target.update({"company_address": source.company_address}) + else: + target.update(get_company_address(target.company)) - def update_item(source, target, source_parent): - """Update the item information for the invoice.""" - target.amount = flt(source.amount) - flt(source.billed_amt) - target.base_amount = target.amount * flt(source_parent.conversion_rate) - target.qty = ( - target.amount / flt(source.rate) - if (source.rate and source.billed_amt) - else source.qty - source.returned_qty + if target.company_address: + target.update( + get_fetch_values("Sales Invoice", "company_address", target.company_address) ) - if source_parent.project: - target.cost_center = frappe.db.get_value( - "Project", source_parent.project, "cost_center" - ) - if target.item_code: - item = get_item_defaults(target.item_code, source_parent.company) - item_group = get_item_group_defaults( - target.item_code, source_parent.company - ) - cost_center = item.get("selling_cost_center") or item_group.get( - "selling_cost_center" - ) + if source.loyalty_points and source.order_type == "Shopping Cart": + target.redeem_loyalty_points = 1 + target.loyalty_points = source.loyalty_points - if cost_center: - target.cost_center = cost_center + target.debit_to = get_party_account("Customer", source.customer, source.company) - doclist = [] - for source_name in source_names: - try: - current_doc = get_mapped_doc( - "Sales Order", - source_name, - { - "Sales Order": { - "doctype": "Sales Invoice", - "field_map": { - "party_account_currency": "party_account_currency", - "payment_terms_template": "payment_terms_template", - }, - "field_no_map": ["payment_terms_template"], - "validation": {"docstatus": ["=", 1]}, - }, - "Sales Order Item": { - "doctype": "Sales Invoice Item", - "field_map": { - "name": "so_detail", - "parent": "sales_order", - }, - "postprocess": update_item, - "condition": lambda doc: doc.qty - and ( - doc.base_amount == 0 - or abs(doc.billed_amt) < abs(doc.amount) - ), - }, - "Sales Taxes and Charges": { - "doctype": "Sales Taxes and Charges", - "reset_value": True, - }, - "Sales Team": {"doctype": "Sales Team", "add_if_empty": True}, - "Sales Order Meter Reading": { - "doctype": "Sales Invoice Meter Reading", - "field_map": { - "meter_number": "meter_number", - "item_code": "item_code", - "uom": "uom", - "stock_uom": "stock_uom", - "previous_consumption": "previous_consumption", - "current_reading": "current_reading", - "previous_reading": "previous_reading", - }, - "condition": lambda doc: doc.meter_number is not None, - }, - }, - target_doc, - postprocess, - ignore_permissions=ignore_permissions, - ) +def update_item(source, target, source_parent): + """Update the item details for the invoice.""" + target.amount = flt(source.amount) - flt(source.billed_amt) + target.base_amount = target.amount * flt(source_parent.conversion_rate) + target.qty = ( + target.amount / flt(source.rate) + if (source.rate and source.billed_amt) + else source.qty - source.returned_qty + ) - doclist.append(current_doc) + set_cost_center(source_parent, target) - except Exception as e: - frappe.log_error( - message=str(e), title=f"Error processing Sales Order {source_name}" - ) - create_log(source_name, e, "Sales Order", "Sales Invoice", "Failed") - continue - if doclist: - target_invoice = doclist[0] - for doc in doclist[1:]: - merge_invoice_items(target_invoice, doc) - merge_invoice_taxes(target_invoice, doc) +def set_cost_center(source_parent, target): + """Set the cost center for the invoice item.""" + if source_parent.project: + target.cost_center = frappe.db.get_value( + "Project", source_parent.project, "cost_center" + ) + + if target.item_code: + item = get_item_defaults(target.item_code, source_parent.company) + item_group = get_item_group_defaults(target.item_code, source_parent.company) + cost_center = item.get("selling_cost_center") or item_group.get( + "selling_cost_center" + ) - target_invoice.run_method("calculate_taxes_and_totals") - target_invoice.run_method("set_payment_schedule") + if cost_center: + target.cost_center = cost_center - target_invoice.save() +def log_error_and_continue(source_name, e): + """Log errors and continue processing other Sales Orders.""" + frappe.log_error( + message=str(e), title=f"Error processing Sales Order {source_name}" + ) + create_log(source_name, e, "Sales Order", "Sales Invoice", "Failed") + + +def merge_documents(doclist): + """Merge multiple invoices into one.""" + target_invoice = doclist[0] + for doc in doclist[1:]: + merge_invoice_items(target_invoice, doc) + merge_invoice_taxes(target_invoice, doc) return target_invoice +def finalize_invoice(invoice): + """Calculate totals and save the invoice.""" + invoice.run_method("calculate_taxes_and_totals") + invoice.run_method("set_payment_schedule") + invoice.save() + + def merge_invoice_items(target_invoice, doc): """Merge items from multiple docs into a single invoice.""" for item in doc.items: diff --git a/utility_billing/utility_billing/utils/create_meter_reading_rates.py b/utility_billing/utility_billing/utils/create_meter_reading_rates.py index a956356..b735a25 100644 --- a/utility_billing/utility_billing/utils/create_meter_reading_rates.py +++ b/utility_billing/utility_billing/utils/create_meter_reading_rates.py @@ -1,85 +1,121 @@ import frappe +@frappe.whitelist() def create_meter_reading_rates(meter_reading, price_list, reading_date): """Create Meter Reading Tariff Rate documents based on the Meter Reading date and append to its rates child table.""" meter_reading.set("rates", []) for item in meter_reading.items: - item_prices = frappe.get_list( - "Item Price", - filters={ - "item_code": item.item_code, - "price_list": price_list, - "valid_from": ("<=", reading_date), - "valid_upto": (">=", reading_date), - }, - fields=["name", "tariffs", "is_fixed_meter_charge"], - ) + item_prices = get_item_prices(item.item_code, price_list, reading_date) if not item_prices: - frappe.throw( - ( - "No valid pricing available for Item {0} on Price List {1} as of {2}" - ).format(item.item_code, price_list, reading_date) - ) + raise_no_pricing_error(item.item_code, price_list, reading_date) total_consumption = item.consumption + for item_price in item_prices: item_price_doc = frappe.get_doc("Item Price", item_price.name) - is_fixed_meter_charge = item_price_doc.is_fixed_meter_charge - - if is_fixed_meter_charge == 1: - first_tariff = item_price_doc.tariffs[0] - - if total_consumption > first_tariff.upper_limit: - frappe.throw( - ( - "Consumption for Item {0} exceeds the upper limit of the fixed charge slab ({1})." - ).format(item.item_code, first_tariff.upper_limit) - ) - - slab_quantity = first_tariff.upper_limit - rate = first_tariff.rate - amount = slab_quantity * rate - - meter_reading.append( - "rates", - { - "meter_reading_item": item.name, - "item_code": item.item_code, - "block": first_tariff.block, - "qty": slab_quantity, - "amount": amount, - "rate": rate, - }, + if is_fixed_meter_charge(item_price_doc): + process_fixed_meter_charge( + meter_reading, item, item_price_doc, total_consumption ) else: - for tariff in item_price_doc.tariffs: - if total_consumption <= 0: - break - - slab_quantity = min( - total_consumption, tariff.upper_limit - tariff.lower_limit - ) - - if slab_quantity <= 0: - continue - - rate = tariff.rate - amount = slab_quantity * rate - - meter_reading.append( - "rates", - { - "meter_reading_item": item.name, - "item_code": item.item_code, - "block": tariff.block, - "qty": slab_quantity, - "amount": amount, - "rate": rate, - }, - ) - - total_consumption -= slab_quantity + process_tariff_charges( + meter_reading, item, item_price_doc, total_consumption + ) + + +def get_item_prices(item_code, price_list, reading_date): + """Fetch item prices for the given item code and reading date.""" + return frappe.get_list( + "Item Price", + filters={ + "item_code": item_code, + "price_list": price_list, + "valid_from": ("<=", reading_date), + "valid_upto": (">=", reading_date), + }, + fields=["name", "tariffs", "is_fixed_meter_charge"], + ) + + +def raise_no_pricing_error(item_code, price_list, reading_date): + """Raise an error if no valid pricing is available.""" + frappe.throw( + ("No valid pricing available for Item {0} on Price List {1} as of {2}").format( + item_code, price_list, reading_date + ) + ) + + +def is_fixed_meter_charge(item_price_doc): + """Check if the item price document has a fixed meter charge.""" + return item_price_doc.is_fixed_meter_charge == 1 + + +def process_fixed_meter_charge(meter_reading, item, item_price_doc, total_consumption): + """Process fixed meter charges and append rates to meter reading.""" + first_tariff = item_price_doc.tariffs[0] + + if total_consumption > first_tariff.upper_limit: + raise_exceeds_upper_limit_error(item.item_code, first_tariff.upper_limit) + + slab_quantity = first_tariff.upper_limit + rate = first_tariff.rate + amount = slab_quantity * rate + + append_meter_reading_rate( + meter_reading, item, first_tariff.block, slab_quantity, rate, amount + ) + + +def raise_exceeds_upper_limit_error(item_code, upper_limit): + """Raise an error if the consumption exceeds the upper limit of a fixed charge slab.""" + frappe.throw( + ( + "Consumption for Item {0} exceeds the upper limit of the fixed charge slab ({1})." + ).format(item_code, upper_limit) + ) + + +def process_tariff_charges(meter_reading, item, item_price_doc, total_consumption): + """Process non-fixed tariff charges based on consumption.""" + for tariff in item_price_doc.tariffs: + if total_consumption <= 0: + break + + slab_quantity = calculate_slab_quantity(total_consumption, tariff) + + if slab_quantity <= 0: + continue + + rate = tariff.rate + amount = slab_quantity * rate + + append_meter_reading_rate( + meter_reading, item, tariff.block, slab_quantity, rate, amount + ) + + total_consumption -= slab_quantity + + +def calculate_slab_quantity(total_consumption, tariff): + """Calculate the quantity for a specific tariff slab.""" + return min(total_consumption, tariff.upper_limit - tariff.lower_limit) + + +def append_meter_reading_rate(meter_reading, item, block, slab_quantity, rate, amount): + """Append a rate entry to the meter reading.""" + meter_reading.append( + "rates", + { + "meter_reading_item": item.name, + "item_code": item.item_code, + "block": block, + "qty": slab_quantity, + "amount": amount, + "rate": rate, + }, + )